dsdb: Log the transaction duraton.
[nivanova/samba-autobuild/.git] / python / samba / tests / audit_log_dsdb.py
index 5329af45e69b7c378615c356ad4eada8751279a8..53d457374248f5faf76768fb15a0467de2ecd699 100644 (file)
@@ -21,6 +21,7 @@ from __future__ import print_function
 
 import samba.tests
 from samba.dcerpc.messaging import MSG_DSDB_LOG, DSDB_EVENT_NAME
+from ldb import ERR_NO_SUCH_OBJECT
 from samba.samdb import SamDB
 from samba.auth import system_session
 import os
@@ -33,8 +34,6 @@ from samba.dcerpc import security, lsa
 
 USER_NAME = "auditlogtestuser"
 USER_PASS = samba.generate_random_password(32, 32)
-SECOND_USER_NAME = "auditlogtestuser02"
-SECOND_USER_PASS = samba.generate_random_password(32, 32)
 
 
 class AuditLogDsdbTests(AuditLogTestBase):
@@ -79,9 +78,6 @@ class AuditLogDsdbTests(AuditLogTestBase):
 
         # (Re)adds the test user USER_NAME with password USER_PASS
         delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
-        delete_force(
-            self.ldb,
-            "cn=" + SECOND_USER_NAME + ",cn=users," + self.base_dn)
         self.ldb.add({
             "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
             "objectclass": "user",
@@ -93,15 +89,21 @@ class AuditLogDsdbTests(AuditLogTestBase):
     # Discard the messages from the setup code
     #
     def discardSetupMessages(self, dn):
-        messages = self.waitForMessages(2, dn=dn)
+        self.waitForMessages(2, dn=dn)
         self.discardMessages()
 
-
     def tearDown(self):
         self.discardMessages()
         super(AuditLogDsdbTests, self).tearDown()
 
-    def waitForTransaction(self, connection=None):
+    def haveExpectedTxn(self, expected):
+        if self.context["txnMessage"] is not None:
+            txn = self.context["txnMessage"]["dsdbTransaction"]
+            if txn["transactionId"] == expected:
+                return True
+        return False
+
+    def waitForTransaction(self, expected, connection=None):
         """Wait for a transaction message to arrive
         The connection is passed through to keep the connection alive
         until all the logging messages have been received.
@@ -110,7 +112,7 @@ class AuditLogDsdbTests(AuditLogTestBase):
         self.connection = connection
 
         start_time = time.time()
-        while self.context["txnMessage"] == "":
+        while not self.haveExpectedTxn(expected):
             self.msg_ctx.loop_once(0.1)
             if time.time() - start_time > 1:
                 self.connection = None
@@ -329,55 +331,72 @@ class AuditLogDsdbTests(AuditLogTestBase):
 
         self.ldb.deleteuser(USER_NAME)
 
-        messages = self.waitForMessages(2, dn=dn)
+        messages = self.waitForMessages(1, dn=dn)
         print("Received %d messages" % len(messages))
-        self.assertEquals(2,
+        self.assertEquals(1,
                           len(messages),
                           "Did not receive the expected number of messages")
 
-        audit = messages[1]["dsdbChange"]
+        audit = messages[0]["dsdbChange"]
         self.assertEquals("Delete", audit["operation"])
         self.assertFalse(audit["performedAsSystem"])
         self.assertTrue(dn.lower(), audit["dn"].lower())
         self.assertRegexpMatches(audit["remoteAddress"],
                                  self.remoteAddress)
         self.assertTrue(self.is_guid(audit["sessionId"]))
+        self.assertEquals(0, audit["statusCode"])
+        self.assertEquals("Success", audit["status"])
         session_id = self.get_session()
         self.assertEquals(session_id, audit["sessionId"])
         service_description = self.get_service_description()
         self.assertEquals(service_description, "LDAP")
 
-    def test_net_set_password_user_without_permission(self):
-
-        self.ldb.newuser(SECOND_USER_NAME, SECOND_USER_PASS)
+        transactionId = audit["transactionId"]
+        message = self.waitForTransaction(transactionId)
+        audit = message["dsdbTransaction"]
+        self.assertEquals("commit", audit["action"])
+        self.assertTrue(self.is_guid(audit["transactionId"]))
+        self.assertTrue(audit["duration"] > 0)
 
-        creds = self.insta_creds(
-            template=self.get_credentials(),
-            username=SECOND_USER_NAME,
-            userpass=SECOND_USER_PASS,
-            kerberos_state=None)
+    def test_samdb_delete_non_existent_dn(self):
 
-        lp = self.get_loadparm()
-        net = Net(creds, lp, server=self.server)
-        password = "newPassword!!42"
-        domain = lp.get("workgroup")
+        DOES_NOT_EXIST = "doesNotExist"
+        dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
+        self.discardSetupMessages(dn)
 
-        #
-        # This operation should fail and trigger a transaction roll back.
-        #
+        dn = "cn=" + DOES_NOT_EXIST + ",cn=users," + self.base_dn
         try:
-            net.set_password(newpassword=password.encode('utf-8'),
-                             account_name=USER_NAME,
-                             domain_name=domain)
-            self.fail("Expected exception not thrown")
+            self.ldb.delete(dn)
+            self.fail("Exception not thrown")
         except Exception:
             pass
 
-        message = self.waitForTransaction(net)
+        messages = self.waitForMessages(1)
+        print("Received %d messages" % len(messages))
+        self.assertEquals(1,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+        audit = messages[0]["dsdbChange"]
+        self.assertEquals("Delete", audit["operation"])
+        self.assertFalse(audit["performedAsSystem"])
+        self.assertTrue(dn.lower(), audit["dn"].lower())
+        self.assertRegexpMatches(audit["remoteAddress"],
+                                 self.remoteAddress)
+        self.assertEquals(ERR_NO_SUCH_OBJECT, audit["statusCode"])
+        self.assertEquals("No such object", audit["status"])
+        self.assertTrue(self.is_guid(audit["sessionId"]))
+        session_id = self.get_session()
+        self.assertEquals(session_id, audit["sessionId"])
+        service_description = self.get_service_description()
+        self.assertEquals(service_description, "LDAP")
 
+        transactionId = audit["transactionId"]
+        message = self.waitForTransaction(transactionId)
         audit = message["dsdbTransaction"]
         self.assertEquals("rollback", audit["action"])
         self.assertTrue(self.is_guid(audit["transactionId"]))
+        self.assertTrue(audit["duration"] > 0)
 
     def test_create_and_delete_secret_over_lsa(self):