1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 """Tests for the SamDb logging of password changes.
23 from samba.dcerpc.messaging import MSG_GROUP_LOG, DSDB_GROUP_EVENT_NAME
24 from samba.dcerpc.windows_event_ids import (
25 EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
26 EVT_ID_USER_REMOVED_FROM_GLOBAL_SEC_GROUP
28 from samba.samdb import SamDB
29 from samba.auth import system_session
31 from samba.tests.audit_log_base import AuditLogTestBase
32 from samba.tests import delete_force
34 from ldb import FLAG_MOD_REPLACE
36 USER_NAME = "grpadttstuser01"
37 USER_PASS = samba.generate_random_password(32, 32)
39 SECOND_USER_NAME = "grpadttstuser02"
40 SECOND_USER_PASS = samba.generate_random_password(32, 32)
42 GROUP_NAME_01 = "group-audit-01"
43 GROUP_NAME_02 = "group-audit-02"
46 class GroupAuditTests(AuditLogTestBase):
49 self.message_type = MSG_GROUP_LOG
50 self.event_type = DSDB_GROUP_EVENT_NAME
51 super(GroupAuditTests, self).setUp()
53 self.server_ip = os.environ["SERVER_IP"]
55 host = "ldap://%s" % os.environ["SERVER"]
56 self.ldb = SamDB(url=host,
57 session_info=system_session(),
58 credentials=self.get_credentials(),
59 lp=self.get_loadparm())
60 self.server = os.environ["SERVER"]
62 # Gets back the basedn
63 self.base_dn = self.ldb.domain_dn()
65 # Get the old "dSHeuristics" if it was set
66 dsheuristics = self.ldb.get_dsheuristics()
68 # Set the "dSHeuristics" to activate the correct "userPassword"
70 self.ldb.set_dsheuristics("000000001")
72 # Reset the "dSHeuristics" as they were before
73 self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
75 # Get the old "minPwdAge"
76 minPwdAge = self.ldb.get_minPwdAge()
78 # Set it temporarily to "0"
79 self.ldb.set_minPwdAge("0")
80 self.base_dn = self.ldb.domain_dn()
82 # Reset the "minPwdAge" as it was before
83 self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
85 # (Re)adds the test user USER_NAME with password USER_PASS
87 "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
88 "objectclass": "user",
89 "sAMAccountName": USER_NAME,
90 "userPassword": USER_PASS
92 self.ldb.newgroup(GROUP_NAME_01)
93 self.ldb.newgroup(GROUP_NAME_02)
96 super(GroupAuditTests, self).tearDown()
97 delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
98 self.ldb.deletegroup(GROUP_NAME_01)
99 self.ldb.deletegroup(GROUP_NAME_02)
101 def test_add_and_remove_users_from_group(self):
104 # Wait for the primary group change for the created user.
106 messages = self.waitForMessages(2)
107 print("Received %d messages" % len(messages))
110 "Did not receive the expected number of messages")
111 audit = messages[0]["groupChange"]
113 self.assertEqual("PrimaryGroup", audit["action"])
114 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
115 group_dn = "cn=domain users,cn=users," + self.base_dn
116 self.assertTrue(user_dn.lower(), audit["user"].lower())
117 self.assertTrue(group_dn.lower(), audit["group"].lower())
118 self.assertRegexpMatches(audit["remoteAddress"],
120 self.assertTrue(self.is_guid(audit["sessionId"]))
121 session_id = self.get_session()
122 self.assertEqual(session_id, audit["sessionId"])
123 service_description = self.get_service_description()
124 self.assertEqual(service_description, "LDAP")
126 # Check the Add message for the new users primary group
127 audit = messages[1]["groupChange"]
129 self.assertEqual("Added", audit["action"])
130 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
131 group_dn = "cn=domain users,cn=users," + self.base_dn
132 self.assertTrue(user_dn.lower(), audit["user"].lower())
133 self.assertTrue(group_dn.lower(), audit["group"].lower())
134 self.assertRegexpMatches(audit["remoteAddress"],
136 self.assertTrue(self.is_guid(audit["sessionId"]))
137 session_id = self.get_session()
138 self.assertEqual(session_id, audit["sessionId"])
139 self.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
142 # Add the user to a group
144 self.discardMessages()
146 self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
147 messages = self.waitForMessages(1)
148 print("Received %d messages" % len(messages))
151 "Did not receive the expected number of messages")
152 audit = messages[0]["groupChange"]
154 self.assertEqual("Added", audit["action"])
155 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
156 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
157 self.assertTrue(user_dn.lower(), audit["user"].lower())
158 self.assertTrue(group_dn.lower(), audit["group"].lower())
159 self.assertRegexpMatches(audit["remoteAddress"],
161 self.assertTrue(self.is_guid(audit["sessionId"]))
162 session_id = self.get_session()
163 self.assertEqual(session_id, audit["sessionId"])
164 service_description = self.get_service_description()
165 self.assertEqual(service_description, "LDAP")
168 # Add the user to another group
170 self.discardMessages()
171 self.ldb.add_remove_group_members(GROUP_NAME_02, [USER_NAME])
173 messages = self.waitForMessages(1)
174 print("Received %d messages" % len(messages))
177 "Did not receive the expected number of messages")
178 audit = messages[0]["groupChange"]
180 self.assertEqual("Added", audit["action"])
181 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
182 group_dn = "cn=" + GROUP_NAME_02 + ",cn=users," + self.base_dn
183 self.assertTrue(user_dn.lower(), audit["user"].lower())
184 self.assertTrue(group_dn.lower(), audit["group"].lower())
185 self.assertRegexpMatches(audit["remoteAddress"],
187 self.assertTrue(self.is_guid(audit["sessionId"]))
188 session_id = self.get_session()
189 self.assertEqual(session_id, audit["sessionId"])
190 service_description = self.get_service_description()
191 self.assertEqual(service_description, "LDAP")
194 # Remove the user from a group
196 self.discardMessages()
197 self.ldb.add_remove_group_members(
200 add_members_operation=False)
201 messages = self.waitForMessages(1)
202 print("Received %d messages" % len(messages))
205 "Did not receive the expected number of messages")
206 audit = messages[0]["groupChange"]
208 self.assertEqual("Removed", audit["action"])
209 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
210 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
211 self.assertTrue(user_dn.lower(), audit["user"].lower())
212 self.assertTrue(group_dn.lower(), audit["group"].lower())
213 self.assertRegexpMatches(audit["remoteAddress"],
215 self.assertTrue(self.is_guid(audit["sessionId"]))
216 session_id = self.get_session()
217 self.assertEqual(session_id, audit["sessionId"])
218 service_description = self.get_service_description()
219 self.assertEqual(service_description, "LDAP")
222 # Re-add the user to a group
224 self.discardMessages()
225 self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
227 messages = self.waitForMessages(1)
228 print("Received %d messages" % len(messages))
231 "Did not receive the expected number of messages")
232 audit = messages[0]["groupChange"]
234 self.assertEqual("Added", audit["action"])
235 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
236 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
237 self.assertTrue(user_dn.lower(), audit["user"].lower())
238 self.assertTrue(group_dn.lower(), audit["group"].lower())
239 self.assertRegexpMatches(audit["remoteAddress"],
241 self.assertTrue(self.is_guid(audit["sessionId"]))
242 session_id = self.get_session()
243 self.assertEqual(session_id, audit["sessionId"])
244 service_description = self.get_service_description()
245 self.assertEqual(service_description, "LDAP")
247 def test_change_primary_group(self):
250 # Wait for the primary group change for the created user.
252 messages = self.waitForMessages(2)
253 print("Received %d messages" % len(messages))
256 "Did not receive the expected number of messages")
258 # Check the PrimaryGroup message
259 audit = messages[0]["groupChange"]
261 self.assertEqual("PrimaryGroup", audit["action"])
262 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
263 group_dn = "cn=domain users,cn=users," + self.base_dn
264 self.assertTrue(user_dn.lower(), audit["user"].lower())
265 self.assertTrue(group_dn.lower(), audit["group"].lower())
266 self.assertRegexpMatches(audit["remoteAddress"],
268 self.assertTrue(self.is_guid(audit["sessionId"]))
269 session_id = self.get_session()
270 self.assertEqual(session_id, audit["sessionId"])
271 service_description = self.get_service_description()
272 self.assertEqual(service_description, "LDAP")
274 # Check the Add message for the new users primary group
275 audit = messages[1]["groupChange"]
277 self.assertEqual("Added", audit["action"])
278 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
279 group_dn = "cn=domain users,cn=users," + self.base_dn
280 self.assertTrue(user_dn.lower(), audit["user"].lower())
281 self.assertTrue(group_dn.lower(), audit["group"].lower())
282 self.assertRegexpMatches(audit["remoteAddress"],
284 self.assertTrue(self.is_guid(audit["sessionId"]))
285 session_id = self.get_session()
286 self.assertEqual(session_id, audit["sessionId"])
287 self.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
291 # Add the user to a group, the user needs to be a member of a group
292 # before there primary group can be set to that group.
294 self.discardMessages()
296 self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
297 messages = self.waitForMessages(1)
298 print("Received %d messages" % len(messages))
301 "Did not receive the expected number of messages")
302 audit = messages[0]["groupChange"]
304 self.assertEqual("Added", audit["action"])
305 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
306 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
307 self.assertTrue(user_dn.lower(), audit["user"].lower())
308 self.assertTrue(group_dn.lower(), audit["group"].lower())
309 self.assertRegexpMatches(audit["remoteAddress"],
311 self.assertTrue(self.is_guid(audit["sessionId"]))
312 session_id = self.get_session()
313 self.assertEqual(session_id, audit["sessionId"])
314 service_description = self.get_service_description()
315 self.assertEqual(service_description, "LDAP")
316 self.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
320 # Change the primary group of a user
322 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
323 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
324 # get the primaryGroupToken of the group
325 res = self.ldb.search(base=group_dn, attrs=["primaryGroupToken"],
326 scope=ldb.SCOPE_BASE)
327 group_id = res[0]["primaryGroupToken"]
329 # set primaryGroupID attribute of the user to that group
331 m.dn = ldb.Dn(self.ldb, user_dn)
332 m["primaryGroupID"] = ldb.MessageElement(
336 self.discardMessages()
340 # Wait for the primary group change.
341 # Will see the user removed from the new group
342 # the user added to their old primary group
343 # and a new primary group event.
345 messages = self.waitForMessages(3)
346 print("Received %d messages" % len(messages))
349 "Did not receive the expected number of messages")
351 audit = messages[0]["groupChange"]
352 self.assertEqual("Removed", audit["action"])
353 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
354 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
355 self.assertTrue(user_dn.lower(), audit["user"].lower())
356 self.assertTrue(group_dn.lower(), audit["group"].lower())
357 self.assertRegexpMatches(audit["remoteAddress"],
359 self.assertTrue(self.is_guid(audit["sessionId"]))
360 session_id = self.get_session()
361 self.assertEqual(session_id, audit["sessionId"])
362 service_description = self.get_service_description()
363 self.assertEqual(service_description, "LDAP")
364 self.assertEqual(EVT_ID_USER_REMOVED_FROM_GLOBAL_SEC_GROUP,
367 audit = messages[1]["groupChange"]
369 self.assertEqual("Added", audit["action"])
370 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
371 group_dn = "cn=domain users,cn=users," + self.base_dn
372 self.assertTrue(user_dn.lower(), audit["user"].lower())
373 self.assertTrue(group_dn.lower(), audit["group"].lower())
374 self.assertRegexpMatches(audit["remoteAddress"],
376 self.assertTrue(self.is_guid(audit["sessionId"]))
377 session_id = self.get_session()
378 self.assertEqual(session_id, audit["sessionId"])
379 service_description = self.get_service_description()
380 self.assertEqual(service_description, "LDAP")
381 self.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
384 audit = messages[2]["groupChange"]
386 self.assertEqual("PrimaryGroup", audit["action"])
387 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
388 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
389 self.assertTrue(user_dn.lower(), audit["user"].lower())
390 self.assertTrue(group_dn.lower(), audit["group"].lower())
391 self.assertRegexpMatches(audit["remoteAddress"],
393 self.assertTrue(self.is_guid(audit["sessionId"]))
394 session_id = self.get_session()
395 self.assertEqual(session_id, audit["sessionId"])
396 service_description = self.get_service_description()
397 self.assertEqual(service_description, "LDAP")