1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 """Tests for the SamDb logging of password changes.
23 from samba.dcerpc.messaging import MSG_GROUP_LOG, DSDB_GROUP_EVENT_NAME
24 from samba.samdb import SamDB
25 from samba.auth import system_session
27 from samba.tests.audit_log_base import AuditLogTestBase
28 from samba.tests import delete_force
30 from ldb import FLAG_MOD_REPLACE
32 USER_NAME = "grpadttstuser01"
33 USER_PASS = samba.generate_random_password(32, 32)
35 SECOND_USER_NAME = "grpadttstuser02"
36 SECOND_USER_PASS = samba.generate_random_password(32, 32)
38 GROUP_NAME_01 = "group-audit-01"
39 GROUP_NAME_02 = "group-audit-02"
42 class GroupAuditTests(AuditLogTestBase):
45 self.message_type = MSG_GROUP_LOG
46 self.event_type = DSDB_GROUP_EVENT_NAME
47 super(GroupAuditTests, self).setUp()
49 self.remoteAddress = os.environ["CLIENT_IP"]
50 self.server_ip = os.environ["SERVER_IP"]
52 host = "ldap://%s" % os.environ["SERVER"]
53 self.ldb = SamDB(url=host,
54 session_info=system_session(),
55 credentials=self.get_credentials(),
56 lp=self.get_loadparm())
57 self.server = os.environ["SERVER"]
59 # Gets back the basedn
60 self.base_dn = self.ldb.domain_dn()
62 # Get the old "dSHeuristics" if it was set
63 dsheuristics = self.ldb.get_dsheuristics()
65 # Set the "dSHeuristics" to activate the correct "userPassword"
67 self.ldb.set_dsheuristics("000000001")
69 # Reset the "dSHeuristics" as they were before
70 self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
72 # Get the old "minPwdAge"
73 minPwdAge = self.ldb.get_minPwdAge()
75 # Set it temporarily to "0"
76 self.ldb.set_minPwdAge("0")
77 self.base_dn = self.ldb.domain_dn()
79 # Reset the "minPwdAge" as it was before
80 self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
82 # (Re)adds the test user USER_NAME with password USER_PASS
84 "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
85 "objectclass": "user",
86 "sAMAccountName": USER_NAME,
87 "userPassword": USER_PASS
89 self.ldb.newgroup(GROUP_NAME_01)
90 self.ldb.newgroup(GROUP_NAME_02)
93 super(GroupAuditTests, self).tearDown()
94 delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
95 self.ldb.deletegroup(GROUP_NAME_01)
96 self.ldb.deletegroup(GROUP_NAME_02)
98 def test_add_and_remove_users_from_group(self):
101 # Wait for the primary group change for the created user.
103 messages = self.waitForMessages(1)
104 print("Received %d messages" % len(messages))
107 "Did not receive the expected number of messages")
108 audit = messages[0]["groupChange"]
110 self.assertEqual("PrimaryGroup", audit["action"])
111 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
112 group_dn = "cn=domain users,cn=users," + self.base_dn
113 self.assertTrue(user_dn.lower(), audit["user"].lower())
114 self.assertTrue(group_dn.lower(), audit["group"].lower())
115 self.assertRegexpMatches(audit["remoteAddress"],
117 self.assertTrue(self.is_guid(audit["sessionId"]))
118 session_id = self.get_session()
119 self.assertEquals(session_id, audit["sessionId"])
120 service_description = self.get_service_description()
121 self.assertEquals(service_description, "LDAP")
124 # Add the user to a group
126 self.discardMessages()
128 self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
129 messages = self.waitForMessages(1)
130 print("Received %d messages" % len(messages))
133 "Did not receive the expected number of messages")
134 audit = messages[0]["groupChange"]
136 self.assertEqual("Added", audit["action"])
137 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
138 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
139 self.assertTrue(user_dn.lower(), audit["user"].lower())
140 self.assertTrue(group_dn.lower(), audit["group"].lower())
141 self.assertRegexpMatches(audit["remoteAddress"],
143 self.assertTrue(self.is_guid(audit["sessionId"]))
144 session_id = self.get_session()
145 self.assertEquals(session_id, audit["sessionId"])
146 service_description = self.get_service_description()
147 self.assertEquals(service_description, "LDAP")
150 # Add the user to another group
152 self.discardMessages()
153 self.ldb.add_remove_group_members(GROUP_NAME_02, [USER_NAME])
155 messages = self.waitForMessages(1)
156 print("Received %d messages" % len(messages))
159 "Did not receive the expected number of messages")
160 audit = messages[0]["groupChange"]
162 self.assertEqual("Added", audit["action"])
163 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
164 group_dn = "cn=" + GROUP_NAME_02 + ",cn=users," + self.base_dn
165 self.assertTrue(user_dn.lower(), audit["user"].lower())
166 self.assertTrue(group_dn.lower(), audit["group"].lower())
167 self.assertRegexpMatches(audit["remoteAddress"],
169 self.assertTrue(self.is_guid(audit["sessionId"]))
170 session_id = self.get_session()
171 self.assertEquals(session_id, audit["sessionId"])
172 service_description = self.get_service_description()
173 self.assertEquals(service_description, "LDAP")
176 # Remove the user from a group
178 self.discardMessages()
179 self.ldb.add_remove_group_members(
182 add_members_operation=False)
183 messages = self.waitForMessages(1)
184 print("Received %d messages" % len(messages))
187 "Did not receive the expected number of messages")
188 audit = messages[0]["groupChange"]
190 self.assertEqual("Removed", audit["action"])
191 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
192 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
193 self.assertTrue(user_dn.lower(), audit["user"].lower())
194 self.assertTrue(group_dn.lower(), audit["group"].lower())
195 self.assertRegexpMatches(audit["remoteAddress"],
197 self.assertTrue(self.is_guid(audit["sessionId"]))
198 session_id = self.get_session()
199 self.assertEquals(session_id, audit["sessionId"])
200 service_description = self.get_service_description()
201 self.assertEquals(service_description, "LDAP")
204 # Re-add the user to a group
206 self.discardMessages()
207 self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
209 messages = self.waitForMessages(1)
210 print("Received %d messages" % len(messages))
213 "Did not receive the expected number of messages")
214 audit = messages[0]["groupChange"]
216 self.assertEqual("Added", audit["action"])
217 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
218 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
219 self.assertTrue(user_dn.lower(), audit["user"].lower())
220 self.assertTrue(group_dn.lower(), audit["group"].lower())
221 self.assertRegexpMatches(audit["remoteAddress"],
223 self.assertTrue(self.is_guid(audit["sessionId"]))
224 session_id = self.get_session()
225 self.assertEquals(session_id, audit["sessionId"])
226 service_description = self.get_service_description()
227 self.assertEquals(service_description, "LDAP")
229 def test_change_primary_group(self):
232 # Wait for the primary group change for the created user.
234 messages = self.waitForMessages(1)
235 print("Received %d messages" % len(messages))
238 "Did not receive the expected number of messages")
239 audit = messages[0]["groupChange"]
241 self.assertEqual("PrimaryGroup", audit["action"])
242 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
243 group_dn = "cn=domain users,cn=users," + self.base_dn
244 self.assertTrue(user_dn.lower(), audit["user"].lower())
245 self.assertTrue(group_dn.lower(), audit["group"].lower())
246 self.assertRegexpMatches(audit["remoteAddress"],
248 self.assertTrue(self.is_guid(audit["sessionId"]))
249 session_id = self.get_session()
250 self.assertEquals(session_id, audit["sessionId"])
251 service_description = self.get_service_description()
252 self.assertEquals(service_description, "LDAP")
255 # Add the user to a group, the user needs to be a member of a group
256 # before there primary group can be set to that group.
258 self.discardMessages()
260 self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
261 messages = self.waitForMessages(1)
262 print("Received %d messages" % len(messages))
265 "Did not receive the expected number of messages")
266 audit = messages[0]["groupChange"]
268 self.assertEqual("Added", audit["action"])
269 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
270 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
271 self.assertTrue(user_dn.lower(), audit["user"].lower())
272 self.assertTrue(group_dn.lower(), audit["group"].lower())
273 self.assertRegexpMatches(audit["remoteAddress"],
275 self.assertTrue(self.is_guid(audit["sessionId"]))
276 session_id = self.get_session()
277 self.assertEquals(session_id, audit["sessionId"])
278 service_description = self.get_service_description()
279 self.assertEquals(service_description, "LDAP")
282 # Change the primary group of a user
284 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
285 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
286 # get the primaryGroupToken of the group
287 res = self.ldb.search(base=group_dn, attrs=["primaryGroupToken"],
288 scope=ldb.SCOPE_BASE)
289 group_id = res[0]["primaryGroupToken"]
291 # set primaryGroupID attribute of the user to that group
293 m.dn = ldb.Dn(self.ldb, user_dn)
294 m["primaryGroupID"] = ldb.MessageElement(
298 self.discardMessages()
302 # Wait for the primary group change.
303 # Will see the user removed from the new group
304 # the user added to their old primary group
305 # and a new primary group event.
307 messages = self.waitForMessages(3)
308 print("Received %d messages" % len(messages))
311 "Did not receive the expected number of messages")
313 audit = messages[0]["groupChange"]
314 self.assertEqual("Removed", audit["action"])
315 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
316 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
317 self.assertTrue(user_dn.lower(), audit["user"].lower())
318 self.assertTrue(group_dn.lower(), audit["group"].lower())
319 self.assertRegexpMatches(audit["remoteAddress"],
321 self.assertTrue(self.is_guid(audit["sessionId"]))
322 session_id = self.get_session()
323 self.assertEquals(session_id, audit["sessionId"])
324 service_description = self.get_service_description()
325 self.assertEquals(service_description, "LDAP")
327 audit = messages[1]["groupChange"]
329 self.assertEqual("Added", audit["action"])
330 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
331 group_dn = "cn=domain users,cn=users," + self.base_dn
332 self.assertTrue(user_dn.lower(), audit["user"].lower())
333 self.assertTrue(group_dn.lower(), audit["group"].lower())
334 self.assertRegexpMatches(audit["remoteAddress"],
336 self.assertTrue(self.is_guid(audit["sessionId"]))
337 session_id = self.get_session()
338 self.assertEquals(session_id, audit["sessionId"])
339 service_description = self.get_service_description()
340 self.assertEquals(service_description, "LDAP")
342 audit = messages[2]["groupChange"]
344 self.assertEqual("PrimaryGroup", audit["action"])
345 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
346 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
347 self.assertTrue(user_dn.lower(), audit["user"].lower())
348 self.assertTrue(group_dn.lower(), audit["group"].lower())
349 self.assertRegexpMatches(audit["remoteAddress"],
351 self.assertTrue(self.is_guid(audit["sessionId"]))
352 session_id = self.get_session()
353 self.assertEquals(session_id, audit["sessionId"])
354 service_description = self.get_service_description()
355 self.assertEquals(service_description, "LDAP")