python: tests: update all super calls to python 3 style in tests
[samba.git] / python / samba / tests / audit_log_dsdb.py
1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for the SamDb logging of password changes.
19 """
20
21 import samba.tests
22 from samba.dcerpc.messaging import MSG_DSDB_LOG, DSDB_EVENT_NAME
23 from ldb import ERR_NO_SUCH_OBJECT
24 from samba.samdb import SamDB
25 from samba.auth import system_session
26 import os
27 import time
28 from samba.tests.audit_log_base import AuditLogTestBase
29 from samba.tests import delete_force
30 from samba.net import Net
31 import samba
32 from samba.dcerpc import security, lsa
33
34 USER_NAME = "auditlogtestuser"
35 USER_PASS = samba.generate_random_password(32, 32)
36
37
38 class AuditLogDsdbTests(AuditLogTestBase):
39
40     def setUp(self):
41         self.message_type = MSG_DSDB_LOG
42         self.event_type = DSDB_EVENT_NAME
43         super().setUp()
44
45         self.server_ip = os.environ["SERVER_IP"]
46
47         host = "ldap://%s" % os.environ["SERVER"]
48         self.ldb = SamDB(url=host,
49                          session_info=system_session(),
50                          credentials=self.get_credentials(),
51                          lp=self.get_loadparm())
52         self.server = os.environ["SERVER"]
53
54         # Gets back the basedn
55         self.base_dn = self.ldb.domain_dn()
56
57         # Get the old "dSHeuristics" if it was set
58         dsheuristics = self.ldb.get_dsheuristics()
59
60         # Set the "dSHeuristics" to activate the correct "userPassword"
61         # behaviour
62         self.ldb.set_dsheuristics("000000001")
63
64         # Reset the "dSHeuristics" as they were before
65         self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
66
67         # Get the old "minPwdAge"
68         minPwdAge = self.ldb.get_minPwdAge()
69
70         # Set it temporarily to "0"
71         self.ldb.set_minPwdAge("0")
72         self.base_dn = self.ldb.domain_dn()
73
74         # Reset the "minPwdAge" as it was before
75         self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
76
77         # (Re)adds the test user USER_NAME with password USER_PASS
78         delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
79         self.ldb.add({
80             "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
81             "objectclass": "user",
82             "sAMAccountName": USER_NAME,
83             "userPassword": USER_PASS
84         })
85
86     #
87     # Discard the messages from the setup code
88     #
89     def discardSetupMessages(self, dn):
90         self.waitForMessages(2, dn=dn)
91         self.discardMessages()
92
93     def tearDown(self):
94         self.discardMessages()
95         super().tearDown()
96
97     def haveExpectedTxn(self, expected):
98         if self.context["txnMessage"] is not None:
99             txn = self.context["txnMessage"]["dsdbTransaction"]
100             if txn["transactionId"] == expected:
101                 return True
102         return False
103
104     def waitForTransaction(self, expected, connection=None):
105         """Wait for a transaction message to arrive
106         The connection is passed through to keep the connection alive
107         until all the logging messages have been received.
108         """
109
110         self.connection = connection
111
112         start_time = time.time()
113         while not self.haveExpectedTxn(expected):
114             self.msg_ctx.loop_once(0.1)
115             if time.time() - start_time > 1:
116                 self.connection = None
117                 return ""
118
119         self.connection = None
120         return self.context["txnMessage"]
121
122     def test_net_change_password(self):
123
124         dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
125         self.discardSetupMessages(dn)
126
127         creds = self.insta_creds(template=self.get_credentials())
128
129         lp = self.get_loadparm()
130         net = Net(creds, lp, server=self.server)
131         password = "newPassword!!42"
132
133         net.change_password(newpassword=password,
134                             username=USER_NAME,
135                             oldpassword=USER_PASS)
136
137         messages = self.waitForMessages(1, net, dn=dn)
138         print("Received %d messages" % len(messages))
139         self.assertEqual(1,
140                           len(messages),
141                           "Did not receive the expected number of messages")
142
143         audit = messages[0]["dsdbChange"]
144         self.assertEqual("Modify", audit["operation"])
145         self.assertFalse(audit["performedAsSystem"])
146         self.assertTrue(dn.lower(), audit["dn"].lower())
147         self.assertRegex(audit["remoteAddress"],
148                          self.remoteAddress)
149         session_id = self.get_session()
150         self.assertEqual(session_id, audit["sessionId"])
151         # We skip the check for self.get_service_description() as this
152         # is subject to a race between smbd and the s4 rpc_server code
153         # as to which will set the description as it is DCE/RPC over SMB
154
155         self.assertTrue(self.is_guid(audit["transactionId"]))
156
157         attributes = audit["attributes"]
158         self.assertEqual(1, len(attributes))
159         actions = attributes["clearTextPassword"]["actions"]
160         self.assertEqual(1, len(actions))
161         self.assertTrue(actions[0]["redacted"])
162         self.assertEqual("replace", actions[0]["action"])
163
164     def test_net_set_password(self):
165
166         dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
167         self.discardSetupMessages(dn)
168
169         creds = self.insta_creds(template=self.get_credentials())
170
171         lp = self.get_loadparm()
172         net = Net(creds, lp, server=self.server)
173         password = "newPassword!!42"
174         domain = lp.get("workgroup")
175
176         net.set_password(newpassword=password,
177                          account_name=USER_NAME,
178                          domain_name=domain)
179         messages = self.waitForMessages(1, net, dn=dn)
180         print("Received %d messages" % len(messages))
181         self.assertEqual(1,
182                           len(messages),
183                           "Did not receive the expected number of messages")
184         audit = messages[0]["dsdbChange"]
185         self.assertEqual("Modify", audit["operation"])
186         self.assertFalse(audit["performedAsSystem"])
187         self.assertEqual(dn, audit["dn"])
188         self.assertRegex(audit["remoteAddress"],
189                          self.remoteAddress)
190         session_id = self.get_session()
191         self.assertEqual(session_id, audit["sessionId"])
192         # We skip the check for self.get_service_description() as this
193         # is subject to a race between smbd and the s4 rpc_server code
194         # as to which will set the description as it is DCE/RPC over SMB
195
196         self.assertTrue(self.is_guid(audit["transactionId"]))
197
198         attributes = audit["attributes"]
199         self.assertEqual(1, len(attributes))
200         actions = attributes["clearTextPassword"]["actions"]
201         self.assertEqual(1, len(actions))
202         self.assertTrue(actions[0]["redacted"])
203         self.assertEqual("replace", actions[0]["action"])
204
205     def test_ldap_change_password(self):
206
207         dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
208         self.discardSetupMessages(dn)
209
210         new_password = samba.generate_random_password(32, 32)
211         dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
212         self.ldb.modify_ldif(
213             "dn: " + dn + "\n" +
214             "changetype: modify\n" +
215             "delete: userPassword\n" +
216             "userPassword: " + USER_PASS + "\n" +
217             "add: userPassword\n" +
218             "userPassword: " + new_password + "\n")
219
220         messages = self.waitForMessages(1)
221         print("Received %d messages" % len(messages))
222         self.assertEqual(1,
223                           len(messages),
224                           "Did not receive the expected number of messages")
225
226         audit = messages[0]["dsdbChange"]
227         self.assertEqual("Modify", audit["operation"])
228         self.assertFalse(audit["performedAsSystem"])
229         self.assertEqual(dn, audit["dn"])
230         self.assertRegex(audit["remoteAddress"],
231                          self.remoteAddress)
232         self.assertTrue(self.is_guid(audit["sessionId"]))
233         session_id = self.get_session()
234         self.assertEqual(session_id, audit["sessionId"])
235         service_description = self.get_service_description()
236         self.assertEqual(service_description, "LDAP")
237
238         attributes = audit["attributes"]
239         self.assertEqual(1, len(attributes))
240         actions = attributes["userPassword"]["actions"]
241         self.assertEqual(2, len(actions))
242         self.assertTrue(actions[0]["redacted"])
243         self.assertEqual("delete", actions[0]["action"])
244         self.assertTrue(actions[1]["redacted"])
245         self.assertEqual("add", actions[1]["action"])
246
247     def test_ldap_replace_password(self):
248
249         dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
250         self.discardSetupMessages(dn)
251
252         new_password = samba.generate_random_password(32, 32)
253         self.ldb.modify_ldif(
254             "dn: " + dn + "\n" +
255             "changetype: modify\n" +
256             "replace: userPassword\n" +
257             "userPassword: " + new_password + "\n")
258
259         messages = self.waitForMessages(1, dn=dn)
260         print("Received %d messages" % len(messages))
261         self.assertEqual(1,
262                           len(messages),
263                           "Did not receive the expected number of messages")
264
265         audit = messages[0]["dsdbChange"]
266         self.assertEqual("Modify", audit["operation"])
267         self.assertFalse(audit["performedAsSystem"])
268         self.assertTrue(dn.lower(), audit["dn"].lower())
269         self.assertRegex(audit["remoteAddress"],
270                          self.remoteAddress)
271         self.assertTrue(self.is_guid(audit["sessionId"]))
272         session_id = self.get_session()
273         self.assertEqual(session_id, audit["sessionId"])
274         service_description = self.get_service_description()
275         self.assertEqual(service_description, "LDAP")
276         self.assertTrue(self.is_guid(audit["transactionId"]))
277
278         attributes = audit["attributes"]
279         self.assertEqual(1, len(attributes))
280         actions = attributes["userPassword"]["actions"]
281         self.assertEqual(1, len(actions))
282         self.assertTrue(actions[0]["redacted"])
283         self.assertEqual("replace", actions[0]["action"])
284
285     def test_ldap_add_user(self):
286
287         # The setup code adds a user, so we check for the dsdb events
288         # generated by it.
289         dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
290         messages = self.waitForMessages(2, dn=dn)
291         print("Received %d messages" % len(messages))
292         self.assertEqual(2,
293                           len(messages),
294                           "Did not receive the expected number of messages")
295
296         audit = messages[1]["dsdbChange"]
297         self.assertEqual("Add", audit["operation"])
298         self.assertFalse(audit["performedAsSystem"])
299         self.assertEqual(dn, audit["dn"])
300         self.assertRegex(audit["remoteAddress"],
301                          self.remoteAddress)
302         session_id = self.get_session()
303         self.assertEqual(session_id, audit["sessionId"])
304         service_description = self.get_service_description()
305         self.assertEqual(service_description, "LDAP")
306         self.assertTrue(self.is_guid(audit["sessionId"]))
307         self.assertTrue(self.is_guid(audit["transactionId"]))
308
309         attributes = audit["attributes"]
310         self.assertEqual(3, len(attributes))
311
312         actions = attributes["objectclass"]["actions"]
313         self.assertEqual(1, len(actions))
314         self.assertEqual("add", actions[0]["action"])
315         self.assertEqual(1, len(actions[0]["values"]))
316         self.assertEqual("user", actions[0]["values"][0]["value"])
317
318         actions = attributes["sAMAccountName"]["actions"]
319         self.assertEqual(1, len(actions))
320         self.assertEqual("add", actions[0]["action"])
321         self.assertEqual(1, len(actions[0]["values"]))
322         self.assertEqual(USER_NAME, actions[0]["values"][0]["value"])
323
324         actions = attributes["userPassword"]["actions"]
325         self.assertEqual(1, len(actions))
326         self.assertEqual("add", actions[0]["action"])
327         self.assertTrue(actions[0]["redacted"])
328
329     def test_samdb_delete_user(self):
330
331         dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
332         self.discardSetupMessages(dn)
333
334         self.ldb.deleteuser(USER_NAME)
335
336         messages = self.waitForMessages(1, dn=dn)
337         print("Received %d messages" % len(messages))
338         self.assertEqual(1,
339                           len(messages),
340                           "Did not receive the expected number of messages")
341
342         audit = messages[0]["dsdbChange"]
343         self.assertEqual("Delete", audit["operation"])
344         self.assertFalse(audit["performedAsSystem"])
345         self.assertTrue(dn.lower(), audit["dn"].lower())
346         self.assertRegex(audit["remoteAddress"],
347                          self.remoteAddress)
348         self.assertTrue(self.is_guid(audit["sessionId"]))
349         self.assertEqual(0, audit["statusCode"])
350         self.assertEqual("Success", audit["status"])
351         session_id = self.get_session()
352         self.assertEqual(session_id, audit["sessionId"])
353         service_description = self.get_service_description()
354         self.assertEqual(service_description, "LDAP")
355
356         transactionId = audit["transactionId"]
357         message = self.waitForTransaction(transactionId)
358         audit = message["dsdbTransaction"]
359         self.assertEqual("commit", audit["action"])
360         self.assertTrue(self.is_guid(audit["transactionId"]))
361         self.assertTrue(audit["duration"] > 0)
362
363     def test_samdb_delete_non_existent_dn(self):
364
365         DOES_NOT_EXIST = "doesNotExist"
366         dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
367         self.discardSetupMessages(dn)
368
369         dn = "cn=" + DOES_NOT_EXIST + ",cn=users," + self.base_dn
370         try:
371             self.ldb.delete(dn)
372             self.fail("Exception not thrown")
373         except Exception:
374             pass
375
376         messages = self.waitForMessages(1)
377         print("Received %d messages" % len(messages))
378         self.assertEqual(1,
379                           len(messages),
380                           "Did not receive the expected number of messages")
381
382         audit = messages[0]["dsdbChange"]
383         self.assertEqual("Delete", audit["operation"])
384         self.assertFalse(audit["performedAsSystem"])
385         self.assertTrue(dn.lower(), audit["dn"].lower())
386         self.assertRegex(audit["remoteAddress"],
387                          self.remoteAddress)
388         self.assertEqual(ERR_NO_SUCH_OBJECT, audit["statusCode"])
389         self.assertEqual("No such object", audit["status"])
390         self.assertTrue(self.is_guid(audit["sessionId"]))
391         session_id = self.get_session()
392         self.assertEqual(session_id, audit["sessionId"])
393         service_description = self.get_service_description()
394         self.assertEqual(service_description, "LDAP")
395
396         transactionId = audit["transactionId"]
397         message = self.waitForTransaction(transactionId)
398         audit = message["dsdbTransaction"]
399         self.assertEqual("rollback", audit["action"])
400         self.assertTrue(self.is_guid(audit["transactionId"]))
401         self.assertTrue(audit["duration"] > 0)
402
403     def test_create_and_delete_secret_over_lsa(self):
404
405         dn = "cn=Test Secret,CN=System," + self.base_dn
406         self.discardSetupMessages(dn)
407
408         creds = self.insta_creds(template=self.get_credentials())
409         lsa_conn = lsa.lsarpc(
410             "ncacn_np:%s" % self.server,
411             self.get_loadparm(),
412             creds)
413         lsa_handle = lsa_conn.OpenPolicy2(
414             system_name="\\",
415             attr=lsa.ObjectAttribute(),
416             access_mask=security.SEC_FLAG_MAXIMUM_ALLOWED)
417         secret_name = lsa.String()
418         secret_name.string = "G$Test"
419         lsa_conn.CreateSecret(
420             handle=lsa_handle,
421             name=secret_name,
422             access_mask=security.SEC_FLAG_MAXIMUM_ALLOWED)
423
424         messages = self.waitForMessages(1, dn=dn)
425         print("Received %d messages" % len(messages))
426         self.assertEqual(1,
427                           len(messages),
428                           "Did not receive the expected number of messages")
429
430         audit = messages[0]["dsdbChange"]
431         self.assertEqual("Add", audit["operation"])
432         self.assertTrue(audit["performedAsSystem"])
433         self.assertTrue(dn.lower(), audit["dn"].lower())
434         self.assertRegex(audit["remoteAddress"],
435                          self.remoteAddress)
436         self.assertTrue(self.is_guid(audit["sessionId"]))
437         session_id = self.get_session()
438         self.assertEqual(session_id, audit["sessionId"])
439
440         # We skip the check for self.get_service_description() as this
441         # is subject to a race between smbd and the s4 rpc_server code
442         # as to which will set the description as it is DCE/RPC over SMB
443
444         attributes = audit["attributes"]
445         self.assertEqual(2, len(attributes))
446
447         object_class = attributes["objectClass"]
448         self.assertEqual(1, len(object_class["actions"]))
449         action = object_class["actions"][0]
450         self.assertEqual("add", action["action"])
451         values = action["values"]
452         self.assertEqual(1, len(values))
453         self.assertEqual("secret", values[0]["value"])
454
455         cn = attributes["cn"]
456         self.assertEqual(1, len(cn["actions"]))
457         action = cn["actions"][0]
458         self.assertEqual("add", action["action"])
459         values = action["values"]
460         self.assertEqual(1, len(values))
461         self.assertEqual("Test Secret", values[0]["value"])
462
463         #
464         # Now delete the secret.
465         self.discardMessages()
466         h = lsa_conn.OpenSecret(
467             handle=lsa_handle,
468             name=secret_name,
469             access_mask=security.SEC_FLAG_MAXIMUM_ALLOWED)
470
471         lsa_conn.DeleteObject(h)
472         messages = self.waitForMessages(1, dn=dn)
473         print("Received %d messages" % len(messages))
474         self.assertEqual(1,
475                           len(messages),
476                           "Did not receive the expected number of messages")
477
478         dn = "cn=Test Secret,CN=System," + self.base_dn
479         audit = messages[0]["dsdbChange"]
480         self.assertEqual("Delete", audit["operation"])
481         self.assertTrue(audit["performedAsSystem"])
482         self.assertTrue(dn.lower(), audit["dn"].lower())
483         self.assertRegex(audit["remoteAddress"],
484                          self.remoteAddress)
485         self.assertTrue(self.is_guid(audit["sessionId"]))
486         session_id = self.get_session()
487         self.assertEqual(session_id, audit["sessionId"])
488
489         # We skip the check for self.get_service_description() as this
490         # is subject to a race between smbd and the s4 rpc_server code
491         # as to which will set the description as it is DCE/RPC over SMB
492
493     def test_modify(self):
494
495         dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
496         self.discardSetupMessages(dn)
497
498         #
499         # Add an attribute value
500         #
501         self.ldb.modify_ldif(
502             "dn: " + dn + "\n" +
503             "changetype: modify\n" +
504             "add: carLicense\n" +
505             "carLicense: license-01\n")
506
507         messages = self.waitForMessages(1, dn=dn)
508         print("Received %d messages" % len(messages))
509         self.assertEqual(1,
510                           len(messages),
511                           "Did not receive the expected number of messages")
512
513         audit = messages[0]["dsdbChange"]
514         self.assertEqual("Modify", audit["operation"])
515         self.assertFalse(audit["performedAsSystem"])
516         self.assertEqual(dn, audit["dn"])
517         self.assertRegex(audit["remoteAddress"],
518                          self.remoteAddress)
519         self.assertTrue(self.is_guid(audit["sessionId"]))
520         session_id = self.get_session()
521         self.assertEqual(session_id, audit["sessionId"])
522         service_description = self.get_service_description()
523         self.assertEqual(service_description, "LDAP")
524
525         attributes = audit["attributes"]
526         self.assertEqual(1, len(attributes))
527         actions = attributes["carLicense"]["actions"]
528         self.assertEqual(1, len(actions))
529         self.assertEqual("add", actions[0]["action"])
530         values = actions[0]["values"]
531         self.assertEqual(1, len(values))
532         self.assertEqual("license-01", values[0]["value"])
533
534         #
535         # Add an another value to the attribute
536         #
537         self.discardMessages()
538         self.ldb.modify_ldif(
539             "dn: " + dn + "\n" +
540             "changetype: modify\n" +
541             "add: carLicense\n" +
542             "carLicense: license-02\n")
543
544         messages = self.waitForMessages(1, dn=dn)
545         print("Received %d messages" % len(messages))
546         self.assertEqual(1,
547                           len(messages),
548                           "Did not receive the expected number of messages")
549         attributes = messages[0]["dsdbChange"]["attributes"]
550         self.assertEqual(1, len(attributes))
551         actions = attributes["carLicense"]["actions"]
552         self.assertEqual(1, len(actions))
553         self.assertEqual("add", actions[0]["action"])
554         values = actions[0]["values"]
555         self.assertEqual(1, len(values))
556         self.assertEqual("license-02", values[0]["value"])
557
558         #
559         # Add an another two values to the attribute
560         #
561         self.discardMessages()
562         self.ldb.modify_ldif(
563             "dn: " + dn + "\n" +
564             "changetype: modify\n" +
565             "add: carLicense\n" +
566             "carLicense: license-03\n" +
567             "carLicense: license-04\n")
568
569         messages = self.waitForMessages(1, dn=dn)
570         print("Received %d messages" % len(messages))
571         self.assertEqual(1,
572                           len(messages),
573                           "Did not receive the expected number of messages")
574         attributes = messages[0]["dsdbChange"]["attributes"]
575         self.assertEqual(1, len(attributes))
576         actions = attributes["carLicense"]["actions"]
577         self.assertEqual(1, len(actions))
578         self.assertEqual("add", actions[0]["action"])
579         values = actions[0]["values"]
580         self.assertEqual(2, len(values))
581         self.assertEqual("license-03", values[0]["value"])
582         self.assertEqual("license-04", values[1]["value"])
583
584         #
585         # delete two values to the attribute
586         #
587         self.discardMessages()
588         self.ldb.modify_ldif(
589             "dn: " + dn + "\n" +
590             "changetype: modify\n" +
591             "delete: carLicense\n" +
592             "carLicense: license-03\n" +
593             "carLicense: license-04\n")
594
595         messages = self.waitForMessages(1, dn=dn)
596         print("Received %d messages" % len(messages))
597         self.assertEqual(1,
598                           len(messages),
599                           "Did not receive the expected number of messages")
600         attributes = messages[0]["dsdbChange"]["attributes"]
601         self.assertEqual(1, len(attributes))
602         actions = attributes["carLicense"]["actions"]
603         self.assertEqual(1, len(actions))
604         self.assertEqual("delete", actions[0]["action"])
605         values = actions[0]["values"]
606         self.assertEqual(2, len(values))
607         self.assertEqual("license-03", values[0]["value"])
608         self.assertEqual("license-04", values[1]["value"])
609
610         #
611         # replace two values to the attribute
612         #
613         self.discardMessages()
614         self.ldb.modify_ldif(
615             "dn: " + dn + "\n" +
616             "changetype: modify\n" +
617             "replace: carLicense\n" +
618             "carLicense: license-05\n" +
619             "carLicense: license-06\n")
620
621         messages = self.waitForMessages(1, dn=dn)
622         print("Received %d messages" % len(messages))
623         self.assertEqual(1,
624                           len(messages),
625                           "Did not receive the expected number of messages")
626         attributes = messages[0]["dsdbChange"]["attributes"]
627         self.assertEqual(1, len(attributes))
628         actions = attributes["carLicense"]["actions"]
629         self.assertEqual(1, len(actions))
630         self.assertEqual("replace", actions[0]["action"])
631         values = actions[0]["values"]
632         self.assertEqual(2, len(values))
633         self.assertEqual("license-05", values[0]["value"])
634         self.assertEqual("license-06", values[1]["value"])