53a8bf6afaf686bad402c29e3f5dfc53b13d85c0
[metze/samba-autobuild/.git] / python / samba / tests / group_audit.py
1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the SamDb logging of password changes.
20 """
21
22 import samba.tests
23 from samba.dcerpc.messaging import MSG_GROUP_LOG, DSDB_GROUP_EVENT_NAME
24 from samba.samdb import SamDB
25 from samba.auth import system_session
26 import os
27 from samba.tests.audit_log_base import AuditLogTestBase
28 from samba.tests import delete_force
29 import ldb
30 from ldb import FLAG_MOD_REPLACE
31
32 USER_NAME = "grpadttstuser01"
33 USER_PASS = samba.generate_random_password(32, 32)
34
35 SECOND_USER_NAME = "grpadttstuser02"
36 SECOND_USER_PASS = samba.generate_random_password(32, 32)
37
38 GROUP_NAME_01 = "group-audit-01"
39 GROUP_NAME_02 = "group-audit-02"
40
41
42 class GroupAuditTests(AuditLogTestBase):
43
44     def setUp(self):
45         self.message_type = MSG_GROUP_LOG
46         self.event_type   = DSDB_GROUP_EVENT_NAME
47         super(GroupAuditTests, self).setUp()
48
49         self.remoteAddress = os.environ["CLIENT_IP"]
50         self.server_ip = os.environ["SERVER_IP"]
51
52         host = "ldap://%s" % os.environ["SERVER"]
53         self.ldb = SamDB(url=host,
54                          session_info=system_session(),
55                          credentials=self.get_credentials(),
56                          lp=self.get_loadparm())
57         self.server = os.environ["SERVER"]
58
59         # Gets back the basedn
60         self.base_dn = self.ldb.domain_dn()
61
62         # Get the old "dSHeuristics" if it was set
63         dsheuristics = self.ldb.get_dsheuristics()
64
65         # Set the "dSHeuristics" to activate the correct "userPassword"
66         # behaviour
67         self.ldb.set_dsheuristics("000000001")
68
69         # Reset the "dSHeuristics" as they were before
70         self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
71
72         # Get the old "minPwdAge"
73         minPwdAge = self.ldb.get_minPwdAge()
74
75         # Set it temporarily to "0"
76         self.ldb.set_minPwdAge("0")
77         self.base_dn = self.ldb.domain_dn()
78
79         # Reset the "minPwdAge" as it was before
80         self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
81
82         # (Re)adds the test user USER_NAME with password USER_PASS
83         self.ldb.add({
84             "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
85             "objectclass": "user",
86             "sAMAccountName": USER_NAME,
87             "userPassword": USER_PASS
88         })
89         self.ldb.newgroup(GROUP_NAME_01)
90         self.ldb.newgroup(GROUP_NAME_02)
91
92     def tearDown(self):
93         super(GroupAuditTests, self).tearDown()
94         delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
95         self.ldb.deletegroup(GROUP_NAME_01)
96         self.ldb.deletegroup(GROUP_NAME_02)
97
98     def test_add_and_remove_users_from_group(self):
99
100         #
101         # Wait for the primary group change for the created user.
102         #
103         messages = self.waitForMessages(1)
104         print("Received %d messages" % len(messages))
105         self.assertEquals(1,
106                           len(messages),
107                           "Did not receive the expected number of messages")
108         audit = messages[0]["groupChange"]
109
110         self.assertEqual("PrimaryGroup", audit["action"])
111         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
112         group_dn = "cn=domain users,cn=users," + self.base_dn
113         self.assertTrue(user_dn.lower(), audit["user"].lower())
114         self.assertTrue(group_dn.lower(), audit["group"].lower())
115         self.assertRegexpMatches(audit["remoteAddress"],
116                                  self.remoteAddress)
117         self.assertTrue(self.is_guid(audit["sessionId"]))
118         session_id = self.get_session()
119         self.assertEquals(session_id, audit["sessionId"])
120         service_description = self.get_service_description()
121         self.assertEquals(service_description, "LDAP")
122
123         #
124         # Add the user to a group
125         #
126         self.discardMessages()
127
128         self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
129         messages = self.waitForMessages(1)
130         print("Received %d messages" % len(messages))
131         self.assertEquals(1,
132                           len(messages),
133                           "Did not receive the expected number of messages")
134         audit = messages[0]["groupChange"]
135
136         self.assertEqual("Added", audit["action"])
137         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
138         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
139         self.assertTrue(user_dn.lower(), audit["user"].lower())
140         self.assertTrue(group_dn.lower(), audit["group"].lower())
141         self.assertRegexpMatches(audit["remoteAddress"],
142                                  self.remoteAddress)
143         self.assertTrue(self.is_guid(audit["sessionId"]))
144         session_id = self.get_session()
145         self.assertEquals(session_id, audit["sessionId"])
146         service_description = self.get_service_description()
147         self.assertEquals(service_description, "LDAP")
148
149         #
150         # Add the user to another group
151         #
152         self.discardMessages()
153         self.ldb.add_remove_group_members(GROUP_NAME_02, [USER_NAME])
154
155         messages = self.waitForMessages(1)
156         print("Received %d messages" % len(messages))
157         self.assertEquals(1,
158                           len(messages),
159                           "Did not receive the expected number of messages")
160         audit = messages[0]["groupChange"]
161
162         self.assertEqual("Added", audit["action"])
163         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
164         group_dn = "cn=" + GROUP_NAME_02 + ",cn=users," + self.base_dn
165         self.assertTrue(user_dn.lower(), audit["user"].lower())
166         self.assertTrue(group_dn.lower(), audit["group"].lower())
167         self.assertRegexpMatches(audit["remoteAddress"],
168                                  self.remoteAddress)
169         self.assertTrue(self.is_guid(audit["sessionId"]))
170         session_id = self.get_session()
171         self.assertEquals(session_id, audit["sessionId"])
172         service_description = self.get_service_description()
173         self.assertEquals(service_description, "LDAP")
174
175         #
176         # Remove the user from a group
177         #
178         self.discardMessages()
179         self.ldb.add_remove_group_members(
180             GROUP_NAME_01,
181             [USER_NAME],
182             add_members_operation=False)
183         messages = self.waitForMessages(1)
184         print("Received %d messages" % len(messages))
185         self.assertEquals(1,
186                           len(messages),
187                           "Did not receive the expected number of messages")
188         audit = messages[0]["groupChange"]
189
190         self.assertEqual("Removed", audit["action"])
191         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
192         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
193         self.assertTrue(user_dn.lower(), audit["user"].lower())
194         self.assertTrue(group_dn.lower(), audit["group"].lower())
195         self.assertRegexpMatches(audit["remoteAddress"],
196                                  self.remoteAddress)
197         self.assertTrue(self.is_guid(audit["sessionId"]))
198         session_id = self.get_session()
199         self.assertEquals(session_id, audit["sessionId"])
200         service_description = self.get_service_description()
201         self.assertEquals(service_description, "LDAP")
202
203         #
204         # Re-add the user to a group
205         #
206         self.discardMessages()
207         self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
208
209         messages = self.waitForMessages(1)
210         print("Received %d messages" % len(messages))
211         self.assertEquals(1,
212                           len(messages),
213                           "Did not receive the expected number of messages")
214         audit = messages[0]["groupChange"]
215
216         self.assertEqual("Added", audit["action"])
217         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
218         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
219         self.assertTrue(user_dn.lower(), audit["user"].lower())
220         self.assertTrue(group_dn.lower(), audit["group"].lower())
221         self.assertRegexpMatches(audit["remoteAddress"],
222                                  self.remoteAddress)
223         self.assertTrue(self.is_guid(audit["sessionId"]))
224         session_id = self.get_session()
225         self.assertEquals(session_id, audit["sessionId"])
226         service_description = self.get_service_description()
227         self.assertEquals(service_description, "LDAP")
228
229     def test_change_primary_group(self):
230
231         #
232         # Wait for the primary group change for the created user.
233         #
234         messages = self.waitForMessages(1)
235         print("Received %d messages" % len(messages))
236         self.assertEquals(1,
237                           len(messages),
238                           "Did not receive the expected number of messages")
239         audit = messages[0]["groupChange"]
240
241         self.assertEqual("PrimaryGroup", audit["action"])
242         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
243         group_dn = "cn=domain users,cn=users," + self.base_dn
244         self.assertTrue(user_dn.lower(), audit["user"].lower())
245         self.assertTrue(group_dn.lower(), audit["group"].lower())
246         self.assertRegexpMatches(audit["remoteAddress"],
247                                  self.remoteAddress)
248         self.assertTrue(self.is_guid(audit["sessionId"]))
249         session_id = self.get_session()
250         self.assertEquals(session_id, audit["sessionId"])
251         service_description = self.get_service_description()
252         self.assertEquals(service_description, "LDAP")
253
254         #
255         # Add the user to a group, the user needs to be a member of a group
256         # before there primary group can be set to that group.
257         #
258         self.discardMessages()
259
260         self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
261         messages = self.waitForMessages(1)
262         print("Received %d messages" % len(messages))
263         self.assertEquals(1,
264                           len(messages),
265                           "Did not receive the expected number of messages")
266         audit = messages[0]["groupChange"]
267
268         self.assertEqual("Added", audit["action"])
269         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
270         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
271         self.assertTrue(user_dn.lower(), audit["user"].lower())
272         self.assertTrue(group_dn.lower(), audit["group"].lower())
273         self.assertRegexpMatches(audit["remoteAddress"],
274                                  self.remoteAddress)
275         self.assertTrue(self.is_guid(audit["sessionId"]))
276         session_id = self.get_session()
277         self.assertEquals(session_id, audit["sessionId"])
278         service_description = self.get_service_description()
279         self.assertEquals(service_description, "LDAP")
280
281         #
282         # Change the primary group of a user
283         #
284         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
285         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
286         # get the primaryGroupToken of the group
287         res = self.ldb.search(base=group_dn, attrs=["primaryGroupToken"],
288                               scope=ldb.SCOPE_BASE)
289         group_id = res[0]["primaryGroupToken"]
290
291         # set primaryGroupID attribute of the user to that group
292         m = ldb.Message()
293         m.dn = ldb.Dn(self.ldb, user_dn)
294         m["primaryGroupID"] = ldb.MessageElement(
295             group_id,
296             FLAG_MOD_REPLACE,
297             "primaryGroupID")
298         self.discardMessages()
299         self.ldb.modify(m)
300
301         #
302         # Wait for the primary group change.
303         # Will see the user removed from the new group
304         #          the user added to their old primary group
305         #          and a new primary group event.
306         #
307         messages = self.waitForMessages(3)
308         print("Received %d messages" % len(messages))
309         self.assertEquals(3,
310                           len(messages),
311                           "Did not receive the expected number of messages")
312
313         audit = messages[0]["groupChange"]
314         self.assertEqual("Removed", audit["action"])
315         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
316         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
317         self.assertTrue(user_dn.lower(), audit["user"].lower())
318         self.assertTrue(group_dn.lower(), audit["group"].lower())
319         self.assertRegexpMatches(audit["remoteAddress"],
320                                  self.remoteAddress)
321         self.assertTrue(self.is_guid(audit["sessionId"]))
322         session_id = self.get_session()
323         self.assertEquals(session_id, audit["sessionId"])
324         service_description = self.get_service_description()
325         self.assertEquals(service_description, "LDAP")
326
327         audit = messages[1]["groupChange"]
328
329         self.assertEqual("Added", audit["action"])
330         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
331         group_dn = "cn=domain users,cn=users," + self.base_dn
332         self.assertTrue(user_dn.lower(), audit["user"].lower())
333         self.assertTrue(group_dn.lower(), audit["group"].lower())
334         self.assertRegexpMatches(audit["remoteAddress"],
335                                  self.remoteAddress)
336         self.assertTrue(self.is_guid(audit["sessionId"]))
337         session_id = self.get_session()
338         self.assertEquals(session_id, audit["sessionId"])
339         service_description = self.get_service_description()
340         self.assertEquals(service_description, "LDAP")
341
342         audit = messages[2]["groupChange"]
343
344         self.assertEqual("PrimaryGroup", audit["action"])
345         user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
346         group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
347         self.assertTrue(user_dn.lower(), audit["user"].lower())
348         self.assertTrue(group_dn.lower(), audit["group"].lower())
349         self.assertRegexpMatches(audit["remoteAddress"],
350                                  self.remoteAddress)
351         self.assertTrue(self.is_guid(audit["sessionId"]))
352         session_id = self.get_session()
353         self.assertEquals(session_id, audit["sessionId"])
354         service_description = self.get_service_description()
355         self.assertEquals(service_description, "LDAP")