pytests: heed assertEquals deprecation warning en-masse
[samba.git] / python / samba / tests / group_audit.py
1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the SamDb logging of password changes.
20 """
21
22 import samba.tests
23 from samba.dcerpc.messaging import MSG_GROUP_LOG, DSDB_GROUP_EVENT_NAME
24 from samba.dcerpc.windows_event_ids import (
25     EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
26     EVT_ID_USER_REMOVED_FROM_GLOBAL_SEC_GROUP
27 )
28 from samba.samdb import SamDB
29 from samba.auth import system_session
30 import os
31 from samba.tests.audit_log_base import AuditLogTestBase
32 from samba.tests import delete_force
33 import ldb
34 from ldb import FLAG_MOD_REPLACE
35
36 USER_NAME = "grpadttstuser01"
37 USER_PASS = samba.generate_random_password(32, 32)
38
39 SECOND_USER_NAME = "grpadttstuser02"
40 SECOND_USER_PASS = samba.generate_random_password(32, 32)
41
42 GROUP_NAME_01 = "group-audit-01"
43 GROUP_NAME_02 = "group-audit-02"
44
45
46 class GroupAuditTests(AuditLogTestBase):
47
48     def setUp(self):
49         self.message_type = MSG_GROUP_LOG
50         self.event_type = DSDB_GROUP_EVENT_NAME
51         super(GroupAuditTests, self).setUp()
52
53         self.server_ip = os.environ["SERVER_IP"]
54
55         host = "ldap://%s" % os.environ["SERVER"]
56         self.ldb = SamDB(url=host,
57                          session_info=system_session(),
58                          credentials=self.get_credentials(),
59                          lp=self.get_loadparm())
60         self.server = os.environ["SERVER"]
61
62         # Gets back the basedn
63         self.base_dn = self.ldb.domain_dn()
64
65         # Get the old "dSHeuristics" if it was set
66         dsheuristics = self.ldb.get_dsheuristics()
67
68         # Set the "dSHeuristics" to activate the correct "userPassword"
69         # behaviour
70         self.ldb.set_dsheuristics("000000001")
71
72         # Reset the "dSHeuristics" as they were before
73         self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
74
75         # Get the old "minPwdAge"
76         minPwdAge = self.ldb.get_minPwdAge()
77
78         # Set it temporarily to "0"
79         self.ldb.set_minPwdAge("0")
80         self.base_dn = self.ldb.domain_dn()
81
82         # Reset the "minPwdAge" as it was before
83         self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
84
85         # (Re)adds the test user USER_NAME with password USER_PASS
86         self.ldb.add({
87             "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
88             "objectclass": "user",
89             "sAMAccountName": USER_NAME,
90             "userPassword": USER_PASS
91         })
92         self.ldb.newgroup(GROUP_NAME_01)
93         self.ldb.newgroup(GROUP_NAME_02)
94
95     def tearDown(self):
96         super(GroupAuditTests, self).tearDown()
97         delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
98         self.ldb.deletegroup(GROUP_NAME_01)
99         self.ldb.deletegroup(GROUP_NAME_02)
100
101     def test_add_and_remove_users_from_group(self):
102
103         #
104         # Wait for the primary group change for the created user.
105         #
106         messages = self.waitForMessages(2)
107         print("Received %d messages" % len(messages))
108         self.assertEqual(2,
109                           len(messages),
110                           "Did not receive the expected number of messages")
111         audit = messages[0]["groupChange"]
112
113         self.assertEqual("PrimaryGroup", audit["action"])
114         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
115         group_dn = "cn=domain users,cn=users," + self.base_dn
116         self.assertTrue(user_dn.lower(), audit["user"].lower())
117         self.assertTrue(group_dn.lower(), audit["group"].lower())
118         self.assertRegexpMatches(audit["remoteAddress"],
119                                  self.remoteAddress)
120         self.assertTrue(self.is_guid(audit["sessionId"]))
121         session_id = self.get_session()
122         self.assertEqual(session_id, audit["sessionId"])
123         service_description = self.get_service_description()
124         self.assertEqual(service_description, "LDAP")
125
126         # Check the Add message for the new users primary group
127         audit = messages[1]["groupChange"]
128
129         self.assertEqual("Added", audit["action"])
130         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
131         group_dn = "cn=domain users,cn=users," + self.base_dn
132         self.assertTrue(user_dn.lower(), audit["user"].lower())
133         self.assertTrue(group_dn.lower(), audit["group"].lower())
134         self.assertRegexpMatches(audit["remoteAddress"],
135                                  self.remoteAddress)
136         self.assertTrue(self.is_guid(audit["sessionId"]))
137         session_id = self.get_session()
138         self.assertEqual(session_id, audit["sessionId"])
139         self.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
140                           audit["eventId"])
141         #
142         # Add the user to a group
143         #
144         self.discardMessages()
145
146         self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
147         messages = self.waitForMessages(1)
148         print("Received %d messages" % len(messages))
149         self.assertEqual(1,
150                           len(messages),
151                           "Did not receive the expected number of messages")
152         audit = messages[0]["groupChange"]
153
154         self.assertEqual("Added", audit["action"])
155         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
156         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
157         self.assertTrue(user_dn.lower(), audit["user"].lower())
158         self.assertTrue(group_dn.lower(), audit["group"].lower())
159         self.assertRegexpMatches(audit["remoteAddress"],
160                                  self.remoteAddress)
161         self.assertTrue(self.is_guid(audit["sessionId"]))
162         session_id = self.get_session()
163         self.assertEqual(session_id, audit["sessionId"])
164         service_description = self.get_service_description()
165         self.assertEqual(service_description, "LDAP")
166
167         #
168         # Add the user to another group
169         #
170         self.discardMessages()
171         self.ldb.add_remove_group_members(GROUP_NAME_02, [USER_NAME])
172
173         messages = self.waitForMessages(1)
174         print("Received %d messages" % len(messages))
175         self.assertEqual(1,
176                           len(messages),
177                           "Did not receive the expected number of messages")
178         audit = messages[0]["groupChange"]
179
180         self.assertEqual("Added", audit["action"])
181         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
182         group_dn = "cn=" + GROUP_NAME_02 + ",cn=users," + self.base_dn
183         self.assertTrue(user_dn.lower(), audit["user"].lower())
184         self.assertTrue(group_dn.lower(), audit["group"].lower())
185         self.assertRegexpMatches(audit["remoteAddress"],
186                                  self.remoteAddress)
187         self.assertTrue(self.is_guid(audit["sessionId"]))
188         session_id = self.get_session()
189         self.assertEqual(session_id, audit["sessionId"])
190         service_description = self.get_service_description()
191         self.assertEqual(service_description, "LDAP")
192
193         #
194         # Remove the user from a group
195         #
196         self.discardMessages()
197         self.ldb.add_remove_group_members(
198             GROUP_NAME_01,
199             [USER_NAME],
200             add_members_operation=False)
201         messages = self.waitForMessages(1)
202         print("Received %d messages" % len(messages))
203         self.assertEqual(1,
204                           len(messages),
205                           "Did not receive the expected number of messages")
206         audit = messages[0]["groupChange"]
207
208         self.assertEqual("Removed", audit["action"])
209         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
210         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
211         self.assertTrue(user_dn.lower(), audit["user"].lower())
212         self.assertTrue(group_dn.lower(), audit["group"].lower())
213         self.assertRegexpMatches(audit["remoteAddress"],
214                                  self.remoteAddress)
215         self.assertTrue(self.is_guid(audit["sessionId"]))
216         session_id = self.get_session()
217         self.assertEqual(session_id, audit["sessionId"])
218         service_description = self.get_service_description()
219         self.assertEqual(service_description, "LDAP")
220
221         #
222         # Re-add the user to a group
223         #
224         self.discardMessages()
225         self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
226
227         messages = self.waitForMessages(1)
228         print("Received %d messages" % len(messages))
229         self.assertEqual(1,
230                           len(messages),
231                           "Did not receive the expected number of messages")
232         audit = messages[0]["groupChange"]
233
234         self.assertEqual("Added", audit["action"])
235         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
236         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
237         self.assertTrue(user_dn.lower(), audit["user"].lower())
238         self.assertTrue(group_dn.lower(), audit["group"].lower())
239         self.assertRegexpMatches(audit["remoteAddress"],
240                                  self.remoteAddress)
241         self.assertTrue(self.is_guid(audit["sessionId"]))
242         session_id = self.get_session()
243         self.assertEqual(session_id, audit["sessionId"])
244         service_description = self.get_service_description()
245         self.assertEqual(service_description, "LDAP")
246
247     def test_change_primary_group(self):
248
249         #
250         # Wait for the primary group change for the created user.
251         #
252         messages = self.waitForMessages(2)
253         print("Received %d messages" % len(messages))
254         self.assertEqual(2,
255                           len(messages),
256                           "Did not receive the expected number of messages")
257
258         # Check the PrimaryGroup message
259         audit = messages[0]["groupChange"]
260
261         self.assertEqual("PrimaryGroup", audit["action"])
262         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
263         group_dn = "cn=domain users,cn=users," + self.base_dn
264         self.assertTrue(user_dn.lower(), audit["user"].lower())
265         self.assertTrue(group_dn.lower(), audit["group"].lower())
266         self.assertRegexpMatches(audit["remoteAddress"],
267                                  self.remoteAddress)
268         self.assertTrue(self.is_guid(audit["sessionId"]))
269         session_id = self.get_session()
270         self.assertEqual(session_id, audit["sessionId"])
271         service_description = self.get_service_description()
272         self.assertEqual(service_description, "LDAP")
273
274         # Check the Add message for the new users primary group
275         audit = messages[1]["groupChange"]
276
277         self.assertEqual("Added", audit["action"])
278         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
279         group_dn = "cn=domain users,cn=users," + self.base_dn
280         self.assertTrue(user_dn.lower(), audit["user"].lower())
281         self.assertTrue(group_dn.lower(), audit["group"].lower())
282         self.assertRegexpMatches(audit["remoteAddress"],
283                                  self.remoteAddress)
284         self.assertTrue(self.is_guid(audit["sessionId"]))
285         session_id = self.get_session()
286         self.assertEqual(session_id, audit["sessionId"])
287         self.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
288                           audit["eventId"])
289
290         #
291         # Add the user to a group, the user needs to be a member of a group
292         # before there primary group can be set to that group.
293         #
294         self.discardMessages()
295
296         self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
297         messages = self.waitForMessages(1)
298         print("Received %d messages" % len(messages))
299         self.assertEqual(1,
300                           len(messages),
301                           "Did not receive the expected number of messages")
302         audit = messages[0]["groupChange"]
303
304         self.assertEqual("Added", audit["action"])
305         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
306         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
307         self.assertTrue(user_dn.lower(), audit["user"].lower())
308         self.assertTrue(group_dn.lower(), audit["group"].lower())
309         self.assertRegexpMatches(audit["remoteAddress"],
310                                  self.remoteAddress)
311         self.assertTrue(self.is_guid(audit["sessionId"]))
312         session_id = self.get_session()
313         self.assertEqual(session_id, audit["sessionId"])
314         service_description = self.get_service_description()
315         self.assertEqual(service_description, "LDAP")
316         self.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
317                           audit["eventId"])
318
319         #
320         # Change the primary group of a user
321         #
322         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
323         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
324         # get the primaryGroupToken of the group
325         res = self.ldb.search(base=group_dn, attrs=["primaryGroupToken"],
326                               scope=ldb.SCOPE_BASE)
327         group_id = res[0]["primaryGroupToken"]
328
329         # set primaryGroupID attribute of the user to that group
330         m = ldb.Message()
331         m.dn = ldb.Dn(self.ldb, user_dn)
332         m["primaryGroupID"] = ldb.MessageElement(
333             group_id,
334             FLAG_MOD_REPLACE,
335             "primaryGroupID")
336         self.discardMessages()
337         self.ldb.modify(m)
338
339         #
340         # Wait for the primary group change.
341         # Will see the user removed from the new group
342         #          the user added to their old primary group
343         #          and a new primary group event.
344         #
345         messages = self.waitForMessages(3)
346         print("Received %d messages" % len(messages))
347         self.assertEqual(3,
348                           len(messages),
349                           "Did not receive the expected number of messages")
350
351         audit = messages[0]["groupChange"]
352         self.assertEqual("Removed", audit["action"])
353         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
354         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
355         self.assertTrue(user_dn.lower(), audit["user"].lower())
356         self.assertTrue(group_dn.lower(), audit["group"].lower())
357         self.assertRegexpMatches(audit["remoteAddress"],
358                                  self.remoteAddress)
359         self.assertTrue(self.is_guid(audit["sessionId"]))
360         session_id = self.get_session()
361         self.assertEqual(session_id, audit["sessionId"])
362         service_description = self.get_service_description()
363         self.assertEqual(service_description, "LDAP")
364         self.assertEqual(EVT_ID_USER_REMOVED_FROM_GLOBAL_SEC_GROUP,
365                           audit["eventId"])
366
367         audit = messages[1]["groupChange"]
368
369         self.assertEqual("Added", audit["action"])
370         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
371         group_dn = "cn=domain users,cn=users," + self.base_dn
372         self.assertTrue(user_dn.lower(), audit["user"].lower())
373         self.assertTrue(group_dn.lower(), audit["group"].lower())
374         self.assertRegexpMatches(audit["remoteAddress"],
375                                  self.remoteAddress)
376         self.assertTrue(self.is_guid(audit["sessionId"]))
377         session_id = self.get_session()
378         self.assertEqual(session_id, audit["sessionId"])
379         service_description = self.get_service_description()
380         self.assertEqual(service_description, "LDAP")
381         self.assertEqual(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
382                           audit["eventId"])
383
384         audit = messages[2]["groupChange"]
385
386         self.assertEqual("PrimaryGroup", audit["action"])
387         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
388         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
389         self.assertTrue(user_dn.lower(), audit["user"].lower())
390         self.assertTrue(group_dn.lower(), audit["group"].lower())
391         self.assertRegexpMatches(audit["remoteAddress"],
392                                  self.remoteAddress)
393         self.assertTrue(self.is_guid(audit["sessionId"]))
394         session_id = self.get_session()
395         self.assertEqual(session_id, audit["sessionId"])
396         service_description = self.get_service_description()
397         self.assertEqual(service_description, "LDAP")