s4 group_audit: Add Windows Event Id's to Group membership changes
[metze/samba-autobuild/.git] / python / samba / tests / group_audit.py
1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the SamDb logging of password changes.
20 """
21
22 import samba.tests
23 from samba.dcerpc.messaging import MSG_GROUP_LOG, DSDB_GROUP_EVENT_NAME
24 from samba.dcerpc.windows_event_ids import (
25     EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
26     EVT_ID_USER_REMOVED_FROM_GLOBAL_SEC_GROUP
27 )
28 from samba.samdb import SamDB
29 from samba.auth import system_session
30 import os
31 from samba.tests.audit_log_base import AuditLogTestBase
32 from samba.tests import delete_force
33 import ldb
34 from ldb import FLAG_MOD_REPLACE
35
36 USER_NAME = "grpadttstuser01"
37 USER_PASS = samba.generate_random_password(32, 32)
38
39 SECOND_USER_NAME = "grpadttstuser02"
40 SECOND_USER_PASS = samba.generate_random_password(32, 32)
41
42 GROUP_NAME_01 = "group-audit-01"
43 GROUP_NAME_02 = "group-audit-02"
44
45
46 class GroupAuditTests(AuditLogTestBase):
47
48     def setUp(self):
49         self.message_type = MSG_GROUP_LOG
50         self.event_type   = DSDB_GROUP_EVENT_NAME
51         super(GroupAuditTests, self).setUp()
52
53         self.remoteAddress = os.environ["CLIENT_IP"]
54         self.server_ip = os.environ["SERVER_IP"]
55
56         host = "ldap://%s" % os.environ["SERVER"]
57         self.ldb = SamDB(url=host,
58                          session_info=system_session(),
59                          credentials=self.get_credentials(),
60                          lp=self.get_loadparm())
61         self.server = os.environ["SERVER"]
62
63         # Gets back the basedn
64         self.base_dn = self.ldb.domain_dn()
65
66         # Get the old "dSHeuristics" if it was set
67         dsheuristics = self.ldb.get_dsheuristics()
68
69         # Set the "dSHeuristics" to activate the correct "userPassword"
70         # behaviour
71         self.ldb.set_dsheuristics("000000001")
72
73         # Reset the "dSHeuristics" as they were before
74         self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
75
76         # Get the old "minPwdAge"
77         minPwdAge = self.ldb.get_minPwdAge()
78
79         # Set it temporarily to "0"
80         self.ldb.set_minPwdAge("0")
81         self.base_dn = self.ldb.domain_dn()
82
83         # Reset the "minPwdAge" as it was before
84         self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
85
86         # (Re)adds the test user USER_NAME with password USER_PASS
87         self.ldb.add({
88             "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
89             "objectclass": "user",
90             "sAMAccountName": USER_NAME,
91             "userPassword": USER_PASS
92         })
93         self.ldb.newgroup(GROUP_NAME_01)
94         self.ldb.newgroup(GROUP_NAME_02)
95
96     def tearDown(self):
97         super(GroupAuditTests, self).tearDown()
98         delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
99         self.ldb.deletegroup(GROUP_NAME_01)
100         self.ldb.deletegroup(GROUP_NAME_02)
101
102     def test_add_and_remove_users_from_group(self):
103
104         #
105         # Wait for the primary group change for the created user.
106         #
107         messages = self.waitForMessages(2)
108         print("Received %d messages" % len(messages))
109         self.assertEquals(2,
110                           len(messages),
111                           "Did not receive the expected number of messages")
112         audit = messages[0]["groupChange"]
113
114         self.assertEqual("PrimaryGroup", audit["action"])
115         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
116         group_dn = "cn=domain users,cn=users," + self.base_dn
117         self.assertTrue(user_dn.lower(), audit["user"].lower())
118         self.assertTrue(group_dn.lower(), audit["group"].lower())
119         self.assertRegexpMatches(audit["remoteAddress"],
120                                  self.remoteAddress)
121         self.assertTrue(self.is_guid(audit["sessionId"]))
122         session_id = self.get_session()
123         self.assertEquals(session_id, audit["sessionId"])
124         service_description = self.get_service_description()
125         self.assertEquals(service_description, "LDAP")
126
127         # Check the Add message for the new users primary group
128         audit = messages[1]["groupChange"]
129
130         self.assertEqual("Added", audit["action"])
131         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
132         group_dn = "cn=domain users,cn=users," + self.base_dn
133         self.assertTrue(user_dn.lower(), audit["user"].lower())
134         self.assertTrue(group_dn.lower(), audit["group"].lower())
135         self.assertRegexpMatches(audit["remoteAddress"],
136                                  self.remoteAddress)
137         self.assertTrue(self.is_guid(audit["sessionId"]))
138         session_id = self.get_session()
139         self.assertEquals(session_id, audit["sessionId"])
140         self.assertEquals(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
141                           audit["eventId"])
142         #
143         # Add the user to a group
144         #
145         self.discardMessages()
146
147         self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
148         messages = self.waitForMessages(1)
149         print("Received %d messages" % len(messages))
150         self.assertEquals(1,
151                           len(messages),
152                           "Did not receive the expected number of messages")
153         audit = messages[0]["groupChange"]
154
155         self.assertEqual("Added", audit["action"])
156         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
157         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
158         self.assertTrue(user_dn.lower(), audit["user"].lower())
159         self.assertTrue(group_dn.lower(), audit["group"].lower())
160         self.assertRegexpMatches(audit["remoteAddress"],
161                                  self.remoteAddress)
162         self.assertTrue(self.is_guid(audit["sessionId"]))
163         session_id = self.get_session()
164         self.assertEquals(session_id, audit["sessionId"])
165         service_description = self.get_service_description()
166         self.assertEquals(service_description, "LDAP")
167
168         #
169         # Add the user to another group
170         #
171         self.discardMessages()
172         self.ldb.add_remove_group_members(GROUP_NAME_02, [USER_NAME])
173
174         messages = self.waitForMessages(1)
175         print("Received %d messages" % len(messages))
176         self.assertEquals(1,
177                           len(messages),
178                           "Did not receive the expected number of messages")
179         audit = messages[0]["groupChange"]
180
181         self.assertEqual("Added", audit["action"])
182         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
183         group_dn = "cn=" + GROUP_NAME_02 + ",cn=users," + self.base_dn
184         self.assertTrue(user_dn.lower(), audit["user"].lower())
185         self.assertTrue(group_dn.lower(), audit["group"].lower())
186         self.assertRegexpMatches(audit["remoteAddress"],
187                                  self.remoteAddress)
188         self.assertTrue(self.is_guid(audit["sessionId"]))
189         session_id = self.get_session()
190         self.assertEquals(session_id, audit["sessionId"])
191         service_description = self.get_service_description()
192         self.assertEquals(service_description, "LDAP")
193
194         #
195         # Remove the user from a group
196         #
197         self.discardMessages()
198         self.ldb.add_remove_group_members(
199             GROUP_NAME_01,
200             [USER_NAME],
201             add_members_operation=False)
202         messages = self.waitForMessages(1)
203         print("Received %d messages" % len(messages))
204         self.assertEquals(1,
205                           len(messages),
206                           "Did not receive the expected number of messages")
207         audit = messages[0]["groupChange"]
208
209         self.assertEqual("Removed", audit["action"])
210         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
211         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
212         self.assertTrue(user_dn.lower(), audit["user"].lower())
213         self.assertTrue(group_dn.lower(), audit["group"].lower())
214         self.assertRegexpMatches(audit["remoteAddress"],
215                                  self.remoteAddress)
216         self.assertTrue(self.is_guid(audit["sessionId"]))
217         session_id = self.get_session()
218         self.assertEquals(session_id, audit["sessionId"])
219         service_description = self.get_service_description()
220         self.assertEquals(service_description, "LDAP")
221
222         #
223         # Re-add the user to a group
224         #
225         self.discardMessages()
226         self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
227
228         messages = self.waitForMessages(1)
229         print("Received %d messages" % len(messages))
230         self.assertEquals(1,
231                           len(messages),
232                           "Did not receive the expected number of messages")
233         audit = messages[0]["groupChange"]
234
235         self.assertEqual("Added", audit["action"])
236         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
237         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
238         self.assertTrue(user_dn.lower(), audit["user"].lower())
239         self.assertTrue(group_dn.lower(), audit["group"].lower())
240         self.assertRegexpMatches(audit["remoteAddress"],
241                                  self.remoteAddress)
242         self.assertTrue(self.is_guid(audit["sessionId"]))
243         session_id = self.get_session()
244         self.assertEquals(session_id, audit["sessionId"])
245         service_description = self.get_service_description()
246         self.assertEquals(service_description, "LDAP")
247
248     def test_change_primary_group(self):
249
250         #
251         # Wait for the primary group change for the created user.
252         #
253         messages = self.waitForMessages(2)
254         print("Received %d messages" % len(messages))
255         self.assertEquals(2,
256                           len(messages),
257                           "Did not receive the expected number of messages")
258
259         # Check the PrimaryGroup message
260         audit = messages[0]["groupChange"]
261
262         self.assertEqual("PrimaryGroup", audit["action"])
263         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
264         group_dn = "cn=domain users,cn=users," + self.base_dn
265         self.assertTrue(user_dn.lower(), audit["user"].lower())
266         self.assertTrue(group_dn.lower(), audit["group"].lower())
267         self.assertRegexpMatches(audit["remoteAddress"],
268                                  self.remoteAddress)
269         self.assertTrue(self.is_guid(audit["sessionId"]))
270         session_id = self.get_session()
271         self.assertEquals(session_id, audit["sessionId"])
272         service_description = self.get_service_description()
273         self.assertEquals(service_description, "LDAP")
274
275         # Check the Add message for the new users primary group
276         audit = messages[1]["groupChange"]
277
278         self.assertEqual("Added", audit["action"])
279         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
280         group_dn = "cn=domain users,cn=users," + self.base_dn
281         self.assertTrue(user_dn.lower(), audit["user"].lower())
282         self.assertTrue(group_dn.lower(), audit["group"].lower())
283         self.assertRegexpMatches(audit["remoteAddress"],
284                                  self.remoteAddress)
285         self.assertTrue(self.is_guid(audit["sessionId"]))
286         session_id = self.get_session()
287         self.assertEquals(session_id, audit["sessionId"])
288         self.assertEquals(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
289                           audit["eventId"])
290
291         #
292         # Add the user to a group, the user needs to be a member of a group
293         # before there primary group can be set to that group.
294         #
295         self.discardMessages()
296
297         self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
298         messages = self.waitForMessages(1)
299         print("Received %d messages" % len(messages))
300         self.assertEquals(1,
301                           len(messages),
302                           "Did not receive the expected number of messages")
303         audit = messages[0]["groupChange"]
304
305         self.assertEqual("Added", audit["action"])
306         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
307         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
308         self.assertTrue(user_dn.lower(), audit["user"].lower())
309         self.assertTrue(group_dn.lower(), audit["group"].lower())
310         self.assertRegexpMatches(audit["remoteAddress"],
311                                  self.remoteAddress)
312         self.assertTrue(self.is_guid(audit["sessionId"]))
313         session_id = self.get_session()
314         self.assertEquals(session_id, audit["sessionId"])
315         service_description = self.get_service_description()
316         self.assertEquals(service_description, "LDAP")
317         self.assertEquals(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
318                           audit["eventId"])
319
320         #
321         # Change the primary group of a user
322         #
323         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
324         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
325         # get the primaryGroupToken of the group
326         res = self.ldb.search(base=group_dn, attrs=["primaryGroupToken"],
327                               scope=ldb.SCOPE_BASE)
328         group_id = res[0]["primaryGroupToken"]
329
330         # set primaryGroupID attribute of the user to that group
331         m = ldb.Message()
332         m.dn = ldb.Dn(self.ldb, user_dn)
333         m["primaryGroupID"] = ldb.MessageElement(
334             group_id,
335             FLAG_MOD_REPLACE,
336             "primaryGroupID")
337         self.discardMessages()
338         self.ldb.modify(m)
339
340         #
341         # Wait for the primary group change.
342         # Will see the user removed from the new group
343         #          the user added to their old primary group
344         #          and a new primary group event.
345         #
346         messages = self.waitForMessages(3)
347         print("Received %d messages" % len(messages))
348         self.assertEquals(3,
349                           len(messages),
350                           "Did not receive the expected number of messages")
351
352         audit = messages[0]["groupChange"]
353         self.assertEqual("Removed", audit["action"])
354         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
355         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
356         self.assertTrue(user_dn.lower(), audit["user"].lower())
357         self.assertTrue(group_dn.lower(), audit["group"].lower())
358         self.assertRegexpMatches(audit["remoteAddress"],
359                                  self.remoteAddress)
360         self.assertTrue(self.is_guid(audit["sessionId"]))
361         session_id = self.get_session()
362         self.assertEquals(session_id, audit["sessionId"])
363         service_description = self.get_service_description()
364         self.assertEquals(service_description, "LDAP")
365         self.assertEquals(EVT_ID_USER_REMOVED_FROM_GLOBAL_SEC_GROUP,
366                           audit["eventId"])
367
368         audit = messages[1]["groupChange"]
369
370         self.assertEqual("Added", audit["action"])
371         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
372         group_dn = "cn=domain users,cn=users," + self.base_dn
373         self.assertTrue(user_dn.lower(), audit["user"].lower())
374         self.assertTrue(group_dn.lower(), audit["group"].lower())
375         self.assertRegexpMatches(audit["remoteAddress"],
376                                  self.remoteAddress)
377         self.assertTrue(self.is_guid(audit["sessionId"]))
378         session_id = self.get_session()
379         self.assertEquals(session_id, audit["sessionId"])
380         service_description = self.get_service_description()
381         self.assertEquals(service_description, "LDAP")
382         self.assertEquals(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
383                           audit["eventId"])
384
385         audit = messages[2]["groupChange"]
386
387         self.assertEqual("PrimaryGroup", audit["action"])
388         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
389         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
390         self.assertTrue(user_dn.lower(), audit["user"].lower())
391         self.assertTrue(group_dn.lower(), audit["group"].lower())
392         self.assertRegexpMatches(audit["remoteAddress"],
393                                  self.remoteAddress)
394         self.assertTrue(self.is_guid(audit["sessionId"]))
395         session_id = self.get_session()
396         self.assertEquals(session_id, audit["sessionId"])
397         service_description = self.get_service_description()
398         self.assertEquals(service_description, "LDAP")