auth_log: Add tests by listening for JSON messages over the message bus
authorAndrew Bartlett <abartlet@samba.org>
Tue, 14 Mar 2017 03:43:06 +0000 (16:43 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 29 Mar 2017 00:37:25 +0000 (02:37 +0200)
Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Pair-programmed-by: Gary Lockyer <gary@catalyst.net.nz>
python/samba/tests/auth_log.py [new file with mode: 0644]
python/samba/tests/auth_log_base.py [new file with mode: 0644]
python/samba/tests/auth_log_ncalrpc.py [new file with mode: 0644]
selftest/knownfail
source4/selftest/tests.py

diff --git a/python/samba/tests/auth_log.py b/python/samba/tests/auth_log.py
new file mode 100644 (file)
index 0000000..abd2952
--- /dev/null
@@ -0,0 +1,801 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for the Auth and AuthZ logging.
+"""
+
+from samba import auth
+import samba.tests
+from samba.messaging import Messaging
+from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
+from samba.dcerpc import srvsvc, dnsserver
+import time
+import json
+import os
+from samba import smb
+from samba.samdb import SamDB
+import samba.tests.auth_log_base
+from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
+from samba import NTSTATUSError
+from subprocess import call
+
+class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
+
+    def setUp(self):
+        super(AuthLogTests, self).setUp()
+        self.remoteAddress = os.environ["CLIENT_IP"]
+
+    def tearDown(self):
+        super(AuthLogTests, self).tearDown()
+
+
+
+    def _test_rpc_ncacn_np(self, authTypes, creds, service,
+                           binding, protection, checkFunction):
+        def isLastExpectedMessage( msg):
+            return (
+                msg["type"] == "Authorization" and
+                ( msg["Authorization"]["serviceDescription"]  == "DCE/RPC" or
+                  msg["Authorization"]["serviceDescription"]  == service) and
+                msg["Authorization"]["authType"]            == authTypes[0] and
+                msg["Authorization"]["transportProtection"] == protection
+            )
+
+        if binding:
+            binding = "[%s]" % binding
+
+        if service == "dnsserver":
+            x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
+                                self.get_loadparm(),
+                                creds)
+        elif service == "srvsvc":
+            x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
+                              self.get_loadparm(),
+                              creds)
+
+        # The connection is passed to ensure the server
+        # messaging context stays up until all the messages have been received.
+        messages = self.waitForMessages(isLastExpectedMessage, x)
+        checkFunction(messages, authTypes, service, binding, protection)
+
+    def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
+                                binding, protection):
+
+        expected_messages = len(authTypes)
+        self.assertEquals(expected_messages,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+        # Check the first  message it should be an Authentication
+        msg = messages[0]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("SMB",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
+
+        # Check the second message it should be an Authorization
+        msg = messages[1]
+        self.assertEquals("Authorization", msg["type"])
+        self.assertEquals("SMB",
+                          msg["Authorization"]["serviceDescription"])
+        self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
+        self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
+
+        # Check the third message it should be an Authentication
+        # if we are expecting 4 messages
+        if expected_messages == 4:
+            def checkServiceDescription( desc):
+                return (desc == "DCE/RPC" or desc == service)
+
+            msg = messages[2]
+            self.assertEquals("Authentication", msg["type"])
+            self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+            self.assertTrue(
+                checkServiceDescription( msg["Authentication"]["serviceDescription"]))
+
+            self.assertEquals(authTypes[3], msg["Authentication"]["authDescription"])
+
+    def rpc_ncacn_np_krb5_check(self, messages, authTypes, service, binding, protection):
+
+        expected_messages = len(authTypes)
+        self.assertEquals(expected_messages,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+        # Check the first  message it should be an Authentication
+        # This is almost certainly Authentication over UDP, and is probably
+        # returning message too big,
+        msg = messages[0]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("Kerberos KDC",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
+
+        # Check the second message it should be an Authentication
+        # This this the TCP Authentication in response to the message too big
+        # response to the UDP Authentication
+        msg = messages[1]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("Kerberos KDC",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+
+        # Check the third message it should be an Authorization
+        msg = messages[2]
+        self.assertEquals("Authorization", msg["type"])
+        serviceDescription = "SMB"
+        print "binding %s" % binding
+        if binding == "[smb2]":
+            serviceDescription = "SMB2"
+
+        self.assertEquals(serviceDescription,
+                          msg["Authorization"]["serviceDescription"])
+        self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
+        self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
+
+
+    def test_rpc_ncacn_np_ntlm_dns_sign(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        self._test_rpc_ncacn_np(["NTLMSSP",
+                                 "NTLMSSP",
+                                 "NTLMSSP",
+                                 "NTLMSSP"],
+                                creds, "dnsserver", "sign", "SIGN",
+                                self.rpc_ncacn_np_ntlm_check)
+
+    def test_rpc_ncacn_np_ntlm_srv_sign(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        self._test_rpc_ncacn_np(["NTLMSSP",
+                                 "NTLMSSP",
+                                 "NTLMSSP",
+                                 "NTLMSSP"],
+                                creds, "srvsvc", "sign", "SIGN",
+                                self.rpc_ncacn_np_ntlm_check)
+
+    def test_rpc_ncacn_np_ntlm_dns(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        self._test_rpc_ncacn_np(["ncacn_np",
+                                 "NTLMSSP",
+                                 "NTLMSSP"],
+                                creds, "dnsserver", "", "SMB",
+                                self.rpc_ncacn_np_ntlm_check)
+
+    def test_rpc_ncacn_np_ntlm_srv(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        self._test_rpc_ncacn_np(["ncacn_np",
+                                 "NTLMSSP",
+                                 "NTLMSSP"],
+                                creds, "srvsvc", "", "SMB",
+                                self.rpc_ncacn_np_ntlm_check)
+
+    def test_rpc_ncacn_np_krb_dns_sign(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=MUST_USE_KERBEROS)
+        self._test_rpc_ncacn_np(["krb5",
+                                 "ENC-TS Pre-authentication",
+                                 "ENC-TS Pre-authentication",
+                                 "krb5"],
+                                 creds, "dnsserver", "sign", "SIGN",
+                                 self.rpc_ncacn_np_krb5_check)
+
+    def test_rpc_ncacn_np_krb_srv_sign(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=MUST_USE_KERBEROS)
+        self._test_rpc_ncacn_np(["krb5",
+                                 "ENC-TS Pre-authentication",
+                                 "ENC-TS Pre-authentication",
+                                 "krb5"],
+                                 creds, "srvsvc", "sign", "SIGN",
+                                 self.rpc_ncacn_np_krb5_check)
+
+    def test_rpc_ncacn_np_krb_dns(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=MUST_USE_KERBEROS)
+        self._test_rpc_ncacn_np(["ncacn_np",
+                                 "ENC-TS Pre-authentication",
+                                 "ENC-TS Pre-authentication",
+                                 "krb5"],
+                                creds, "dnsserver", "", "SMB",
+                                self.rpc_ncacn_np_krb5_check)
+
+    def test_rpc_ncacn_np_krb_dns_smb2(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=MUST_USE_KERBEROS)
+        self._test_rpc_ncacn_np(["ncacn_np",
+                                 "ENC-TS Pre-authentication",
+                                 "ENC-TS Pre-authentication",
+                                 "krb5"],
+                                creds, "dnsserver", "smb2", "SMB",
+                                self.rpc_ncacn_np_krb5_check)
+
+    def test_rpc_ncacn_np_krb_srv(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=MUST_USE_KERBEROS)
+        self._test_rpc_ncacn_np(["ncacn_np",
+                                "ENC-TS Pre-authentication",
+                                "ENC-TS Pre-authentication",
+                                "krb5"],
+                                creds, "srvsvc", "", "SMB",
+                                self.rpc_ncacn_np_krb5_check)
+
+    def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
+                               binding, protection, checkFunction):
+        def isLastExpectedMessage( msg):
+            return (
+                msg["type"] == "Authorization" and
+                msg["Authorization"]["serviceDescription"]  == "DCE/RPC" and
+                msg["Authorization"]["authType"]            == authTypes[0] and
+                msg["Authorization"]["transportProtection"] == protection
+            )
+
+        if binding:
+            binding = "[%s]" % binding
+
+        if service == "dnsserver":
+            dnsserver.dnsserver("ncacn_ip_tcp:%s%s" % (self.server, binding),
+                                self.get_loadparm(),
+                                creds)
+        elif service == "srvsvc":
+           srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
+                         self.get_loadparm(),
+                         creds)
+
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        checkFunction(messages, authTypes, service, binding, protection)
+
+    def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
+                                    binding, protection):
+
+        expected_messages = len(authTypes)
+        self.assertEquals(expected_messages,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+        # Check the first message it should be an Authorization
+        msg = messages[0]
+        self.assertEquals("Authorization", msg["type"])
+        self.assertEquals("DCE/RPC",
+                          msg["Authorization"]["serviceDescription"])
+        self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
+        self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
+
+        # Check the second message it should be an Authentication
+        msg = messages[1]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("DCE/RPC",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+
+    def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
+                                    binding, protection):
+
+        expected_messages = len(authTypes)
+        self.assertEquals(expected_messages,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+        # Check the first message it should be an Authorization
+        msg = messages[0]
+        self.assertEquals("Authorization", msg["type"])
+        self.assertEquals("DCE/RPC",
+                          msg["Authorization"]["serviceDescription"])
+        self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
+        self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
+
+        # Check the second message it should be an Authentication
+        msg = messages[1]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("Kerberos KDC",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+
+        # Check the third message it should be an Authentication
+        msg = messages[2]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("Kerberos KDC",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+
+    def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
+                                     "ncacn_ip_tcp",
+                                     "NTLMSSP"],
+                                     creds, "dnsserver", "sign", "SIGN",
+                                     self.rpc_ncacn_ip_tcp_ntlm_check)
+
+    def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=MUST_USE_KERBEROS)
+        self._test_rpc_ncacn_ip_tcp(["krb5",
+                                     "ncacn_ip_tcp",
+                                     "ENC-TS Pre-authentication",
+                                     "ENC-TS Pre-authentication"],
+                                     creds, "dnsserver", "sign", "SIGN",
+                                     self.rpc_ncacn_ip_tcp_krb5_check)
+
+    def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
+                                     "ncacn_ip_tcp",
+                                     "NTLMSSP"],
+                                     creds, "dnsserver", "", "SIGN",
+                                     self.rpc_ncacn_ip_tcp_ntlm_check)
+
+    def test_rpc_ncacn_ip_tcp_krb5_dns(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=MUST_USE_KERBEROS)
+        self._test_rpc_ncacn_ip_tcp(["krb5",
+                                     "ncacn_ip_tcp",
+                                     "ENC-TS Pre-authentication",
+                                     "ENC-TS Pre-authentication"],
+                                     creds, "dnsserver", "", "SIGN",
+                                     self.rpc_ncacn_ip_tcp_krb5_check)
+
+    def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
+                                     "ncacn_ip_tcp",
+                                     "NTLMSSP"],
+                                     creds, "dnsserver", "connect", "NONE",
+                                     self.rpc_ncacn_ip_tcp_ntlm_check)
+
+    def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=MUST_USE_KERBEROS)
+        self._test_rpc_ncacn_ip_tcp(["krb5",
+                                     "ncacn_ip_tcp",
+                                     "ENC-TS Pre-authentication",
+                                     "ENC-TS Pre-authentication"],
+                                     creds, "dnsserver", "connect", "NONE",
+                                     self.rpc_ncacn_ip_tcp_krb5_check)
+
+    def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
+                                     "ncacn_ip_tcp",
+                                     "NTLMSSP"],
+                                     creds, "dnsserver", "seal", "SEAL",
+                                     self.rpc_ncacn_ip_tcp_ntlm_check)
+
+    def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=MUST_USE_KERBEROS)
+        self._test_rpc_ncacn_ip_tcp(["krb5",
+                                     "ncacn_ip_tcp",
+                                     "ENC-TS Pre-authentication",
+                                     "ENC-TS Pre-authentication"],
+                                     creds, "dnsserver", "seal", "SEAL",
+                                     self.rpc_ncacn_ip_tcp_krb5_check)
+
+    def test_ldap(self):
+
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authorization" and
+                    msg["Authorization"]["serviceDescription"]  == "LDAP" and
+                    msg["Authorization"]["transportProtection"] == "SIGN" and
+                    msg["Authorization"]["authType"] == "krb5")
+
+        self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
+                           lp = self.get_loadparm(),
+                           credentials=self.get_credentials())
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(3,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+        # Check the first  message it should be an Authentication
+        msg = messages[0]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("Kerberos KDC",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals("ENC-TS Pre-authentication",
+                           msg["Authentication"]["authDescription"])
+
+        # Check the first  message it should be an Authentication
+        msg = messages[1]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("Kerberos KDC",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals("ENC-TS Pre-authentication",
+                           msg["Authentication"]["authDescription"])
+
+    def test_ldap_ntlm(self):
+
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authorization" and
+                    msg["Authorization"]["serviceDescription"]  == "LDAP" and
+                    msg["Authorization"]["transportProtection"] == "SEAL" and
+                    msg["Authorization"]["authType"] == "NTLMSSP")
+
+        self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
+                           lp = self.get_loadparm(),
+                           credentials=self.get_credentials())
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(2,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+        # Check the first message it should be an Authentication
+        msg = messages[0]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("LDAP",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
+
+    def test_ldap_simple_bind(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authorization" and
+                    msg["Authorization"]["serviceDescription"]  == "LDAP" and
+                    msg["Authorization"]["transportProtection"] == "TLS" and
+                    msg["Authorization"]["authType"] == "simple bind")
+
+        creds = self.insta_creds(template=self.get_credentials())
+        creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
+                                     creds.get_username()))
+
+        self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
+                           lp = self.get_loadparm(),
+                           credentials=creds)
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(2,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+        # Check the first  message it should be an Authentication
+        msg = messages[0]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("LDAP",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals("simple bind",
+                           msg["Authentication"]["authDescription"])
+
+    def test_smb(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authorization" and
+                    msg["Authorization"]["serviceDescription"]  == "SMB" and
+                    msg["Authorization"]["authType"]            == "krb5" and
+                    msg["Authorization"]["transportProtection"] == "SMB")
+
+        creds = self.insta_creds(template=self.get_credentials())
+        smb.SMB(self.server,
+                "sysvol",
+                lp=self.get_loadparm(),
+                creds=creds)
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(3,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+        # Check the first  message it should be an Authentication
+        msg = messages[0]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("Kerberos KDC",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals("ENC-TS Pre-authentication",
+                           msg["Authentication"]["authDescription"])
+
+        # Check the second message it should be an Authentication
+        msg = messages[1]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("Kerberos KDC",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals("ENC-TS Pre-authentication",
+                           msg["Authentication"]["authDescription"])
+
+    def test_smb_bad_password(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authentication" and
+                    msg["Authentication"]["serviceDescription"]
+                        == "Kerberos KDC" and
+                    msg["Authentication"]["status"]
+                        == "NT_STATUS_WRONG_PASSWORD" and
+                    msg["Authentication"]["authDescription"]
+                        == "ENC-TS Pre-authentication")
+
+        creds = self.insta_creds(template=self.get_credentials())
+        creds.set_password("badPassword")
+
+        thrown = False
+        try:
+            smb.SMB(self.server,
+                    "sysvol",
+                    lp=self.get_loadparm(),
+                    creds=creds)
+        except NTSTATUSError:
+            thrown = True
+        self.assertEquals( thrown, True)
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(1,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+
+    def test_smb_bad_user(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authentication" and
+                    msg["Authentication"]["serviceDescription"]
+                        == "Kerberos KDC" and
+                    msg["Authentication"]["status"]
+                        == "NT_STATUS_NO_SUCH_USER" and
+                    msg["Authentication"]["authDescription"]
+                        == "ENC-TS Pre-authentication")
+
+        creds = self.insta_creds(template=self.get_credentials())
+        creds.set_username("badUser")
+
+        thrown = False
+        try:
+            smb.SMB(self.server,
+                    "sysvol",
+                    lp=self.get_loadparm(),
+                    creds=creds)
+        except NTSTATUSError:
+            thrown = True
+        self.assertEquals( thrown, True)
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(1,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+    def test_smb_anonymous(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authorization" and
+                    msg["Authorization"]["serviceDescription"] == "SMB" and
+                    msg["Authorization"]["authType"]           == "NTLMSSP" and
+                    msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
+                    msg["Authorization"]["transportProtection"] == "SMB")
+
+        server   = os.environ["SERVER"]
+
+        path = "//%s/IPC$" % server
+        auth = "-N"
+        call(["bin/smbclient", path, auth, "-c quit"])
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(3,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+        # Check the first message it should be an Authentication
+        msg = messages[0]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_NO_SUCH_USER",
+                          msg["Authentication"]["status"])
+        self.assertEquals("SMB",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals("NTLMSSP",
+                           msg["Authentication"]["authDescription"])
+        self.assertEquals("No-Password",
+                           msg["Authentication"]["passwordType"])
+
+        # Check the second message it should be an Authentication
+        msg = messages[1]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK",
+                          msg["Authentication"]["status"])
+        self.assertEquals("SMB",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals("NTLMSSP",
+                           msg["Authentication"]["authDescription"])
+        self.assertEquals("No-Password",
+                           msg["Authentication"]["passwordType"])
+        self.assertEquals("ANONYMOUS LOGON",
+                           msg["Authentication"]["becameAccount"])
+
+    def test_smb_no_krb_spnego(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authorization" and
+                    msg["Authorization"]["serviceDescription"] == "SMB" and
+                    msg["Authorization"]["authType"] == "NTLMSSP" and
+                    msg["Authorization"]["transportProtection"] == "SMB")
+
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        smb.SMB(self.server,
+                "sysvol",
+                lp=self.get_loadparm(),
+                creds=creds)
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(2,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+        # Check the first  message it should be an Authentication
+        msg = messages[0]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("SMB",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals("NTLMSSP",
+                           msg["Authentication"]["authDescription"])
+        self.assertEquals("NTLMv2",
+                           msg["Authentication"]["passwordType"])
+
+    def test_smb_no_krb_spnego_bad_password(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authentication" and
+                    msg["Authentication"]["serviceDescription"] == "SMB" and
+                    msg["Authentication"]["authDescription"] == "NTLMSSP" and
+                    msg["Authentication"]["passwordType"] == "NTLMv2" and
+                    msg["Authentication"]["status"]
+                        == "NT_STATUS_WRONG_PASSWORD")
+
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        creds.set_password("badPassword")
+
+        thrown = False
+        try:
+            smb.SMB(self.server,
+                    "sysvol",
+                    lp=self.get_loadparm(),
+                    creds=creds)
+        except NTSTATUSError:
+            thrown = True
+        self.assertEquals( thrown, True)
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(1,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+    def test_smb_no_krb_spnego_bad_user(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authentication" and
+                    msg["Authentication"]["serviceDescription"] == "SMB" and
+                    msg["Authentication"]["authDescription"] == "NTLMSSP" and
+                    msg["Authentication"]["passwordType"] == "NTLMv2" and
+                    msg["Authentication"]["status"]
+                        == "NT_STATUS_NO_SUCH_USER")
+
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        creds.set_username("badUser")
+
+        thrown = False
+        try:
+            smb.SMB(self.server,
+                    "sysvol",
+                    lp=self.get_loadparm(),
+                    creds=creds)
+        except NTSTATUSError:
+            thrown = True
+        self.assertEquals( thrown, True)
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(1,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+    def test_smb_no_krb_no_spnego_no_ntlmv2(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authorization" and
+                    msg["Authorization"]["serviceDescription"] == "SMB" and
+                    msg["Authorization"]["authType"] == "bare-NTLM" and
+                    msg["Authorization"]["transportProtection"] == "SMB")
+
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        smb.SMB(self.server,
+                "sysvol",
+                lp=self.get_loadparm(),
+                creds=creds,
+                ntlmv2_auth = False,
+                use_spnego = False )
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(2,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+        # Check the first  message it should be an Authentication
+        msg = messages[0]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("SMB",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals("bare-NTLM",
+                           msg["Authentication"]["authDescription"])
+        self.assertEquals("NTLMv1",
+                           msg["Authentication"]["passwordType"])
+
+    def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authentication" and
+                    msg["Authentication"]["serviceDescription"] == "SMB" and
+                    msg["Authentication"]["authDescription"] == "bare-NTLM" and
+                    msg["Authentication"]["passwordType"] == "NTLMv1" and
+                    msg["Authentication"]["status"]
+                        == "NT_STATUS_WRONG_PASSWORD")
+
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        creds.set_password("badPassword")
+
+        thrown = False
+        try:
+            smb.SMB(self.server,
+                    "sysvol",
+                    lp=self.get_loadparm(),
+                    creds=creds,
+                    ntlmv2_auth = False,
+                    use_spnego = False )
+        except NTSTATUSError:
+            thrown = True
+        self.assertEquals( thrown, True)
+
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(1,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+    def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
+        def isLastExpectedMessage( msg):
+            return (msg["type"] == "Authentication" and
+                    msg["Authentication"]["serviceDescription"] == "SMB" and
+                    msg["Authentication"]["authDescription"] == "bare-NTLM" and
+                    msg["Authentication"]["passwordType"] == "NTLMv1" and
+                    msg["Authentication"]["status"]
+                        == "NT_STATUS_NO_SUCH_USER")
+
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        creds.set_username("badUser")
+
+        thrown = False
+        try:
+            smb.SMB(self.server,
+                    "sysvol",
+                    lp=self.get_loadparm(),
+                    creds=creds,
+                    ntlmv2_auth = False,
+                    use_spnego = False )
+        except NTSTATUSError:
+            thrown = True
+        self.assertEquals( thrown, True)
+
+
+        messages = self.waitForMessages( isLastExpectedMessage)
+        self.assertEquals(1,
+                          len(messages),
+                          "Did not receive the expected number of messages")
diff --git a/python/samba/tests/auth_log_base.py b/python/samba/tests/auth_log_base.py
new file mode 100644 (file)
index 0000000..2b3ccfd
--- /dev/null
@@ -0,0 +1,102 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for the Auth and AuthZ logging.
+"""
+
+from samba import auth
+import samba.tests
+from samba.messaging import Messaging
+from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
+from samba.dcerpc import srvsvc, dnsserver
+import time
+import json
+import os
+from samba import smb
+from samba.samdb import SamDB
+
+class AuthLogTestBase(samba.tests.TestCase):
+
+    def setUp(self):
+        super(AuthLogTestBase, self).setUp()
+        lp_ctx = self.get_loadparm()
+        self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx);
+        self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
+
+        def messageHandler( context, msgType, src, message):
+            # This does not look like sub unit output and it
+            # makes these tests much easier to debug.
+            print message
+            jsonMsg = json.loads(message)
+            context["messages"].append( jsonMsg)
+
+        self.context = { "messages": []}
+        self.msg_handler_and_context = (messageHandler, self.context)
+        self.msg_ctx.register(self.msg_handler_and_context,
+                              msg_type=MSG_AUTH_LOG)
+
+        # Discard any previously queued messages.
+        self.msg_ctx.loop_once(0.001)
+        while len( self.context["messages"]):
+            self.msg_ctx.loop_once(0.001)
+        self.context["messages"] = []
+
+
+        self.remoteAddress = None
+        self.server = os.environ["SERVER"]
+        self.connection = None
+
+    def tearDown(self):
+        if self.msg_handler_and_context:
+            self.msg_ctx.deregister(self.msg_handler_and_context,
+                                    msg_type=MSG_AUTH_LOG)
+
+
+    def waitForMessages(self, isLastExpectedMessage, connection=None):
+
+        def completed( messages):
+            for message in messages:
+                if isRemote( message) and isLastExpectedMessage( message):
+                    return True
+            return False
+
+        def isRemote( message):
+            remote = None
+            if message["type"] == "Authorization":
+                remote = message["Authorization"]["remoteAddress"]
+            elif message["type"] == "Authentication":
+                remote = message["Authentication"]["remoteAddress"]
+            else:
+                return False
+
+            try:
+                addr = remote.split(":")
+                return addr[1] == self.remoteAddress
+            except IndexError:
+                return False
+
+        self.connection = connection
+
+        start_time = time.time()
+        while not completed( self.context["messages"]):
+            self.msg_ctx.loop_once(0.1)
+            if time.time() - start_time > 1:
+                self.connection = None
+                return []
+
+        self.connection = None
+        return filter( isRemote, self.context["messages"])
diff --git a/python/samba/tests/auth_log_ncalrpc.py b/python/samba/tests/auth_log_ncalrpc.py
new file mode 100644 (file)
index 0000000..2538c61
--- /dev/null
@@ -0,0 +1,104 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for the Auth and AuthZ logging.
+"""
+
+from samba import auth
+import samba.tests
+from samba.messaging import Messaging
+from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
+from samba.dcerpc import samr
+import time
+import json
+import os
+from samba import smb
+from samba.samdb import SamDB
+import samba.tests.auth_log_base
+from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
+
+class AuthLogTestsNcalrpc(samba.tests.auth_log_base.AuthLogTestBase):
+
+    def setUp(self):
+        super(AuthLogTestsNcalrpc, self).setUp()
+        self.remoteAddress = "/root/ncalrpc_as_system"
+
+    def tearDown(self):
+        super(AuthLogTestsNcalrpc , self).tearDown()
+
+
+    def _test_rpc_ncaclrpc(self, authTypes, binding, creds,
+                           protection, checkFunction):
+
+        def isLastExpectedMessage( msg):
+            return (
+                msg["type"] == "Authorization" and
+                msg["Authorization"]["serviceDescription"]  == "DCE/RPC" and
+                msg["Authorization"]["authType"]            == authTypes[0] and
+                msg["Authorization"]["transportProtection"] == protection
+            )
+
+        if binding:
+            binding = "[%s]" % binding
+
+        samr.samr("ncalrpc:%s" % binding, self.get_loadparm(), creds)
+        messages = self.waitForMessages( isLastExpectedMessage)
+        checkFunction(messages, authTypes, protection)
+
+    def rpc_ncacn_np_ntlm_check(self, messages, authTypes, protection):
+
+        expected_messages = len(authTypes)
+        self.assertEquals(expected_messages,
+                          len(messages),
+                          "Did not receive the expected number of messages")
+
+        # Check the first message it should be an Authorization
+        msg = messages[0]
+        self.assertEquals("Authorization", msg["type"])
+        self.assertEquals("DCE/RPC",
+                          msg["Authorization"]["serviceDescription"])
+        self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
+        self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
+
+        # Check the second message it should be an Authentication
+        msg = messages[1]
+        self.assertEquals("Authentication", msg["type"])
+        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+        self.assertEquals("DCE/RPC",
+                           msg["Authentication"]["serviceDescription"])
+        self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+
+
+    def test_ncalrpc_ntlm_dns_sign(self):
+
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        self._test_rpc_ncaclrpc(["NTLMSSP",
+                                 "ncalrpc",
+                                 "NTLMSSP"],
+                                "", creds, "SIGN",
+                                self.rpc_ncacn_np_ntlm_check)
+
+    def test_ncalrpc_ntlm_dns_seal(self):
+
+        creds = self.insta_creds(template=self.get_credentials(),
+                                 kerberos_state=DONT_USE_KERBEROS)
+        self._test_rpc_ncaclrpc(["NTLMSSP",
+                                 "ncalrpc",
+                                 "NTLMSSP"],
+                                "seal", creds, "SEAL",
+                                self.rpc_ncacn_np_ntlm_check)
index b25038064c3c458811d46f06f4e0e473ec8abaad..d7a10ef8396c588f7375383e6fd3c2b5b146e4e9 100644 (file)
 ^samba3.smb2.credits.skipped_mid.*
 ^samba4.blackbox.dbcheck-links.release-4-5-0-pre1.dangling_multi_valued_dbcheck
 ^samba4.blackbox.dbcheck-links.release-4-5-0-pre1.dangling_multi_valued_check_missing
 ^samba3.smb2.credits.skipped_mid.*
 ^samba4.blackbox.dbcheck-links.release-4-5-0-pre1.dangling_multi_valued_dbcheck
 ^samba4.blackbox.dbcheck-links.release-4-5-0-pre1.dangling_multi_valued_check_missing
+^samba.tests.auth_log.samba.tests.auth_log.AuthLogTests
+^samba.tests.auth_log_ncalrpc.samba.tests.auth_log_ncalrpc.AuthLogTestsNcalrpc
index 890d41ed713f7dae32d2a8ea3613279ae56220a4..bef08c8b7b5d89ac9b8cc02c73bd0d045658cf0e 100755 (executable)
@@ -68,6 +68,7 @@ finally:
 
 have_tls_support = ("ENABLE_GNUTLS" in config_hash)
 have_heimdal_support = ("SAMBA4_USES_HEIMDAL" in config_hash)
 
 have_tls_support = ("ENABLE_GNUTLS" in config_hash)
 have_heimdal_support = ("SAMBA4_USES_HEIMDAL" in config_hash)
+have_jansson_support = ("HAVE_JANSSON" in config_hash)
 
 if have_tls_support:
     for options in ['-U"$USERNAME%$PASSWORD"']:
 
 if have_tls_support:
     for options in ['-U"$USERNAME%$PASSWORD"']:
@@ -588,6 +589,15 @@ planpythontestsuite("ad_dc:local", "samba.tests.samba_tool.dnscmd")
 planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.dcerpc.rpcecho")
 planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.dcerpc.registry", extra_args=['-U"$USERNAME%$PASSWORD"'])
 planoldpythontestsuite("ad_dc_ntvfs", "samba.tests.dcerpc.dnsserver", extra_args=['-U"$USERNAME%$PASSWORD"'])
 planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.dcerpc.rpcecho")
 planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.dcerpc.registry", extra_args=['-U"$USERNAME%$PASSWORD"'])
 planoldpythontestsuite("ad_dc_ntvfs", "samba.tests.dcerpc.dnsserver", extra_args=['-U"$USERNAME%$PASSWORD"'])
+if have_jansson_support:
+    planoldpythontestsuite("ad_dc:local", "samba.tests.auth_log", extra_args=['-U"$USERNAME%$PASSWORD"'],
+                           environ={'CLIENT_IP': '127.0.0.11',
+                                    'SOCKET_WRAPPER_DEFAULT_IFACE': 11})
+    planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.auth_log", extra_args=['-U"$USERNAME%$PASSWORD"'],
+                           environ={'CLIENT_IP': '127.0.0.11',
+                                    'SOCKET_WRAPPER_DEFAULT_IFACE': 11})
+    planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.auth_log_ncalrpc", extra_args=['-U"$USERNAME%$PASSWORD"'])
+    planoldpythontestsuite("ad_dc:local", "samba.tests.auth_log_ncalrpc", extra_args=['-U"$USERNAME%$PASSWORD"'])
 planoldpythontestsuite("ad_dc", "samba.tests.dcerpc.dnsserver", extra_args=['-U"$USERNAME%$PASSWORD"'])
 planoldpythontestsuite("ad_dc", "samba.tests.dcerpc.raw_protocol", extra_args=['-U"$USERNAME%$PASSWORD"'])
 plantestsuite_loadlist("samba4.ldap.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
 planoldpythontestsuite("ad_dc", "samba.tests.dcerpc.dnsserver", extra_args=['-U"$USERNAME%$PASSWORD"'])
 planoldpythontestsuite("ad_dc", "samba.tests.dcerpc.raw_protocol", extra_args=['-U"$USERNAME%$PASSWORD"'])
 plantestsuite_loadlist("samba4.ldap.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])