# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
"""Tests for the Auth and AuthZ logging.
"""
-
-from samba import auth
import samba.tests
-from samba.messaging import Messaging
-from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
from samba.dcerpc import srvsvc, dnsserver
-import time
-import json
import os
from samba import smb
from samba.samdb import SamDB
import samba.tests.auth_log_base
-from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
+from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
from samba import NTSTATUSError
from subprocess import call
from ldb import LdbError
+from samba.dcerpc.windows_event_ids import (
+ EVT_ID_SUCCESSFUL_LOGON,
+ EVT_ID_UNSUCCESSFUL_LOGON,
+ EVT_LOGON_NETWORK,
+ EVT_LOGON_INTERACTIVE,
+ EVT_LOGON_NETWORK_CLEAR_TEXT
+)
+import re
+
class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
def tearDown(self):
super(AuthLogTests, self).tearDown()
-
-
def _test_rpc_ncacn_np(self, authTypes, creds, service,
binding, protection, checkFunction):
def isLastExpectedMessage(msg):
if service == "dnsserver":
x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
- self.get_loadparm(),
- creds)
+ self.get_loadparm(),
+ creds)
elif service == "srvsvc":
x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
self.get_loadparm(),
messages = self.waitForMessages(isLastExpectedMessage, x)
checkFunction(messages, authTypes, service, binding, protection)
+ def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
+ # Turn "[foo,bar]" into a list ("foo", "bar") to test
+ # lambda x: x removes anything that evaluates to False,
+ # including empty strings, so we handle "" as well
+ binding_list = \
+ list(filter(lambda x: x, re.compile('[\[,\]]').split(binding)))
+
+ # Handle explicit smb2, smb1 or auto negotiation
+ if "smb2" in binding_list:
+ self.assertEquals(serviceDescription, "SMB2")
+ elif "smb1" in binding_list:
+ self.assertEquals(serviceDescription, "SMB")
+ else:
+ self.assertIn(serviceDescription, ["SMB", "SMB2"])
+
def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
binding, protection):
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
- self.assertEquals("SMB",
- msg["Authentication"]["serviceDescription"])
- self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
+ self._assert_ncacn_np_serviceDescription(
+ binding, msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[1],
+ msg["Authentication"]["authDescription"])
# Check the second message it should be an Authorization
msg = messages[1]
self.assertEquals("Authorization", msg["type"])
- self.assertEquals("SMB",
- msg["Authorization"]["serviceDescription"])
+ self._assert_ncacn_np_serviceDescription(
+ binding, msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
+ self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
# Check the third message it should be an Authentication
# if we are expecting 4 messages
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertTrue(
- checkServiceDescription(msg["Authentication"]["serviceDescription"]))
-
- self.assertEquals(authTypes[3], msg["Authentication"]["authDescription"])
-
- def rpc_ncacn_np_krb5_check(self, messages, authTypes, service, binding, protection):
+ checkServiceDescription(
+ msg["Authentication"]["serviceDescription"]))
+
+ self.assertEquals(authTypes[3],
+ msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
+
+ def rpc_ncacn_np_krb5_check(
+ self,
+ messages,
+ authTypes,
+ service,
+ binding,
+ protection):
expected_messages = len(authTypes)
self.assertEquals(expected_messages,
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
- msg["Authentication"]["serviceDescription"])
- self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[1],
+ msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
# Check the second message it should be an Authentication
# This this the TCP Authentication in response to the message too big
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
- msg["Authentication"]["serviceDescription"])
- self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[2],
+ msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
# Check the third message it should be an Authorization
msg = messages[2]
self.assertEquals("Authorization", msg["type"])
- serviceDescription = "SMB"
- print "binding %s" % binding
- if binding == "[smb2]":
- serviceDescription = "SMB2"
-
- self.assertEquals(serviceDescription,
- msg["Authorization"]["serviceDescription"])
+ self._assert_ncacn_np_serviceDescription(
+ binding, msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
-
+ self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
def test_rpc_ncacn_np_ntlm_dns_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication",
"krb5"],
- creds, "dnsserver", "sign", "SIGN",
- self.rpc_ncacn_np_krb5_check)
+ creds, "dnsserver", "sign", "SIGN",
+ self.rpc_ncacn_np_krb5_check)
def test_rpc_ncacn_np_krb_srv_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication",
"krb5"],
- creds, "srvsvc", "sign", "SIGN",
- self.rpc_ncacn_np_krb5_check)
+ creds, "srvsvc", "sign", "SIGN",
+ self.rpc_ncacn_np_krb5_check)
def test_rpc_ncacn_np_krb_dns(self):
creds = self.insta_creds(template=self.get_credentials(),
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=MUST_USE_KERBEROS)
self._test_rpc_ncacn_np(["ncacn_np",
- "ENC-TS Pre-authentication",
- "ENC-TS Pre-authentication",
- "krb5"],
+ "ENC-TS Pre-authentication",
+ "ENC-TS Pre-authentication",
+ "krb5"],
creds, "srvsvc", "", "SMB",
self.rpc_ncacn_np_krb5_check)
binding = "[%s]" % binding
if service == "dnsserver":
- conn = dnsserver.dnsserver("ncacn_ip_tcp:%s%s" % (self.server, binding),
- self.get_loadparm(),
- creds)
+ conn = dnsserver.dnsserver(
+ "ncacn_ip_tcp:%s%s" % (self.server, binding),
+ self.get_loadparm(),
+ creds)
elif service == "srvsvc":
conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
self.get_loadparm(),
creds)
-
messages = self.waitForMessages(isLastExpectedMessage, conn)
checkFunction(messages, authTypes, service, binding, protection)
msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
+ self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
# Check the second message it should be an Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("DCE/RPC",
- msg["Authentication"]["serviceDescription"])
- self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[2],
+ msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
binding, protection):
msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
+ self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
# Check the second message it should be an Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
- msg["Authentication"]["serviceDescription"])
- self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[2],
+ msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
# Check the third message it should be an Authentication
msg = messages[2]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
- msg["Authentication"]["serviceDescription"])
- self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[2],
+ msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
"ncacn_ip_tcp",
"NTLMSSP"],
- creds, "dnsserver", "sign", "SIGN",
- self.rpc_ncacn_ip_tcp_ntlm_check)
+ creds, "dnsserver", "sign", "SIGN",
+ self.rpc_ncacn_ip_tcp_ntlm_check)
def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
"ncacn_ip_tcp",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication"],
- creds, "dnsserver", "sign", "SIGN",
- self.rpc_ncacn_ip_tcp_krb5_check)
+ creds, "dnsserver", "sign", "SIGN",
+ self.rpc_ncacn_ip_tcp_krb5_check)
def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
creds = self.insta_creds(template=self.get_credentials(),
self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
"ncacn_ip_tcp",
"NTLMSSP"],
- creds, "dnsserver", "", "SIGN",
- self.rpc_ncacn_ip_tcp_ntlm_check)
+ creds, "dnsserver", "", "SIGN",
+ self.rpc_ncacn_ip_tcp_ntlm_check)
def test_rpc_ncacn_ip_tcp_krb5_dns(self):
creds = self.insta_creds(template=self.get_credentials(),
"ncacn_ip_tcp",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication"],
- creds, "dnsserver", "", "SIGN",
- self.rpc_ncacn_ip_tcp_krb5_check)
+ creds, "dnsserver", "", "SIGN",
+ self.rpc_ncacn_ip_tcp_krb5_check)
def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
creds = self.insta_creds(template=self.get_credentials(),
self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
"ncacn_ip_tcp",
"NTLMSSP"],
- creds, "dnsserver", "connect", "NONE",
- self.rpc_ncacn_ip_tcp_ntlm_check)
+ creds, "dnsserver", "connect", "NONE",
+ self.rpc_ncacn_ip_tcp_ntlm_check)
def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
creds = self.insta_creds(template=self.get_credentials(),
"ncacn_ip_tcp",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication"],
- creds, "dnsserver", "connect", "NONE",
- self.rpc_ncacn_ip_tcp_krb5_check)
+ creds, "dnsserver", "connect", "NONE",
+ self.rpc_ncacn_ip_tcp_krb5_check)
def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
creds = self.insta_creds(template=self.get_credentials(),
self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
"ncacn_ip_tcp",
"NTLMSSP"],
- creds, "dnsserver", "seal", "SEAL",
- self.rpc_ncacn_ip_tcp_ntlm_check)
+ creds, "dnsserver", "seal", "SEAL",
+ self.rpc_ncacn_ip_tcp_ntlm_check)
def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
creds = self.insta_creds(template=self.get_credentials(),
"ncacn_ip_tcp",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication"],
- creds, "dnsserver", "seal", "SEAL",
- self.rpc_ncacn_ip_tcp_krb5_check)
+ creds, "dnsserver", "seal", "SEAL",
+ self.rpc_ncacn_ip_tcp_krb5_check)
def test_ldap(self):
msg["Authorization"]["authType"] == "krb5")
self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
- lp = self.get_loadparm(),
+ lp=self.get_loadparm(),
credentials=self.get_credentials())
messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
- msg["Authentication"]["serviceDescription"])
+ msg["Authentication"]["serviceDescription"])
self.assertEquals("ENC-TS Pre-authentication",
- msg["Authentication"]["authDescription"])
+ msg["Authentication"]["authDescription"])
+ self.assertTrue(msg["Authentication"]["duration"] > 0)
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
- # Check the first message it should be an Authentication
+ # Check the second message it should be an Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
- msg["Authentication"]["serviceDescription"])
+ msg["Authentication"]["serviceDescription"])
self.assertEquals("ENC-TS Pre-authentication",
- msg["Authentication"]["authDescription"])
+ msg["Authentication"]["authDescription"])
+ self.assertTrue(msg["Authentication"]["duration"] > 0)
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
def test_ldap_ntlm(self):
msg["Authorization"]["authType"] == "NTLMSSP")
self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
- lp = self.get_loadparm(),
+ lp=self.get_loadparm(),
credentials=self.get_credentials())
messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("LDAP",
- msg["Authentication"]["serviceDescription"])
+ msg["Authentication"]["serviceDescription"])
self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
+ self.assertTrue(msg["Authentication"]["duration"] > 0)
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
def test_ldap_simple_bind(self):
def isLastExpectedMessage(msg):
creds = self.insta_creds(template=self.get_credentials())
creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
- creds.get_username()))
+ creds.get_username()))
self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
- lp = self.get_loadparm(),
+ lp=self.get_loadparm(),
credentials=creds)
messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("LDAP",
- msg["Authentication"]["serviceDescription"])
+ msg["Authentication"]["serviceDescription"])
self.assertEquals("simple bind",
- msg["Authentication"]["authDescription"])
+ msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK_CLEAR_TEXT, msg["Authentication"]["logonType"])
def test_ldap_simple_bind_bad_password(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "LDAP" and
- msg["Authentication"]["status"]
- == "NT_STATUS_WRONG_PASSWORD" and
- msg["Authentication"]["authDescription"] == "simple bind")
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_WRONG_PASSWORD") and
+ (msg["Authentication"]["authDescription"] ==
+ "simple bind") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK_CLEAR_TEXT))
creds = self.insta_creds(template=self.get_credentials())
creds.set_password("badPassword")
creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
- creds.get_username()))
+ creds.get_username()))
thrown = False
try:
self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
- lp = self.get_loadparm(),
+ lp=self.get_loadparm(),
credentials=creds)
except LdbError:
thrown = True
len(messages),
"Did not receive the expected number of messages")
-
def test_ldap_simple_bind_bad_user(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "LDAP" and
- msg["Authentication"]["status"]
- == "NT_STATUS_NO_SUCH_USER" and
- msg["Authentication"]["authDescription"] == "simple bind")
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_NO_SUCH_USER") and
+ (msg["Authentication"]["authDescription"] ==
+ "simple bind") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK_CLEAR_TEXT))
creds = self.insta_creds(template=self.get_credentials())
creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
thrown = False
try:
self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
- lp = self.get_loadparm(),
+ lp=self.get_loadparm(),
credentials=creds)
except LdbError:
thrown = True
len(messages),
"Did not receive the expected number of messages")
-
def test_ldap_simple_bind_unparseable_user(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "LDAP" and
- msg["Authentication"]["status"]
- == "NT_STATUS_NO_SUCH_USER" and
- msg["Authentication"]["authDescription"] == "simple bind")
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_NO_SUCH_USER") and
+ (msg["Authentication"]["authDescription"] ==
+ "simple bind") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK_CLEAR_TEXT))
creds = self.insta_creds(template=self.get_credentials())
creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
thrown = False
try:
self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
- lp = self.get_loadparm(),
+ lp=self.get_loadparm(),
credentials=creds)
except LdbError:
thrown = True
def test_ldap_anonymous_access_bind_only(self):
# Should be no logging for anonymous bind
# so receiving any message indicates a failure.
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return True
creds = self.insta_creds(template=self.get_credentials())
creds.set_anonymous()
self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
- lp = self.get_loadparm(),
+ lp=self.get_loadparm(),
credentials=creds)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(0,
len(messages),
"Did not receive the expected number of messages")
def test_ldap_anonymous_access(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
- msg["Authorization"]["serviceDescription"] == "LDAP" and
+ msg["Authorization"]["serviceDescription"] == "LDAP" and
msg["Authorization"]["transportProtection"] == "TLS" and
msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
msg["Authorization"]["authType"] == "no bind")
creds.set_anonymous()
self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
- lp = self.get_loadparm(),
+ lp=self.get_loadparm(),
credentials=creds)
try:
- res = self.samdb.search(base=self.samdb.domain_dn())
- self.fail( "Expected an LdbError exception")
+ self.samdb.search(base=self.samdb.domain_dn())
+ self.fail("Expected an LdbError exception")
except LdbError:
pass
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
+
def test_smb(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
- msg["Authentication"]["serviceDescription"])
+ msg["Authentication"]["serviceDescription"])
self.assertEquals("ENC-TS Pre-authentication",
- msg["Authentication"]["authDescription"])
+ msg["Authentication"]["authDescription"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
# Check the second message it should be an Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
- msg["Authentication"]["serviceDescription"])
+ msg["Authentication"]["serviceDescription"])
self.assertEquals("ENC-TS Pre-authentication",
- msg["Authentication"]["authDescription"])
+ msg["Authentication"]["authDescription"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
def test_smb_bad_password(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "Kerberos KDC" and
- msg["Authentication"]["status"]
- == "NT_STATUS_WRONG_PASSWORD" and
- msg["Authentication"]["authDescription"]
- == "ENC-TS Pre-authentication")
+ (msg["Authentication"]["serviceDescription"] ==
+ "Kerberos KDC") and
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_WRONG_PASSWORD") and
+ (msg["Authentication"]["authDescription"] ==
+ "ENC-TS Pre-authentication"))
creds = self.insta_creds(template=self.get_credentials())
creds.set_password("badPassword")
len(messages),
"Did not receive the expected number of messages")
-
def test_smb_bad_user(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "Kerberos KDC" and
- msg["Authentication"]["status"]
- == "NT_STATUS_NO_SUCH_USER" and
- msg["Authentication"]["authDescription"]
- == "ENC-TS Pre-authentication")
+ (msg["Authentication"]["serviceDescription"] ==
+ "Kerberos KDC") and
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_NO_SUCH_USER") and
+ (msg["Authentication"]["authDescription"] ==
+ "ENC-TS Pre-authentication") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
creds = self.insta_creds(template=self.get_credentials())
creds.set_username("badUser")
msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
msg["Authorization"]["transportProtection"] == "SMB")
- server = os.environ["SERVER"]
+ server = os.environ["SERVER"]
path = "//%s/IPC$" % server
auth = "-N"
msg["Authentication"]["authDescription"])
self.assertEquals("No-Password",
msg["Authentication"]["passwordType"])
+ self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
# Check the second message it should be an Authentication
msg = messages[1]
msg["Authentication"]["passwordType"])
self.assertEquals("ANONYMOUS LOGON",
msg["Authentication"]["becameAccount"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
def test_smb2_anonymous(self):
def isLastExpectedMessage(msg):
msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
msg["Authorization"]["transportProtection"] == "SMB")
- server = os.environ["SERVER"]
+ server = os.environ["SERVER"]
path = "//%s/IPC$" % server
auth = "-N"
msg["Authentication"]["authDescription"])
self.assertEquals("No-Password",
msg["Authentication"]["passwordType"])
+ self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
# Check the second message it should be an Authentication
msg = messages[1]
msg["Authentication"]["passwordType"])
self.assertEquals("ANONYMOUS LOGON",
msg["Authentication"]["becameAccount"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
def test_smb_no_krb_spnego(self):
def isLastExpectedMessage(msg):
msg["Authentication"]["authDescription"])
self.assertEquals("NTLMv2",
msg["Authentication"]["passwordType"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
def test_smb_no_krb_spnego_bad_password(self):
def isLastExpectedMessage(msg):
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "NTLMSSP" and
msg["Authentication"]["passwordType"] == "NTLMv2" and
- msg["Authentication"]["status"]
- == "NT_STATUS_WRONG_PASSWORD")
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_WRONG_PASSWORD") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "NTLMSSP" and
msg["Authentication"]["passwordType"] == "NTLMv2" and
- msg["Authentication"]["status"]
- == "NT_STATUS_NO_SUCH_USER")
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_NO_SUCH_USER") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
msg["Authentication"]["authDescription"])
self.assertEquals("NTLMv1",
msg["Authentication"]["passwordType"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
def isLastExpectedMessage(msg):
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "bare-NTLM" and
msg["Authentication"]["passwordType"] == "NTLMv1" and
- msg["Authentication"]["status"]
- == "NT_STATUS_WRONG_PASSWORD")
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_WRONG_PASSWORD") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
thrown = True
self.assertEquals(thrown, True)
-
messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "bare-NTLM" and
msg["Authentication"]["passwordType"] == "NTLMv1" and
- msg["Authentication"]["status"]
- == "NT_STATUS_NO_SUCH_USER")
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_NO_SUCH_USER") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
thrown = True
self.assertEquals(thrown, True)
-
messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "interactive" and
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ (msg["Authentication"]["authDescription"] ==
+ "interactive") and
msg["Authentication"]["status"] == "NT_STATUS_OK" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_INTERACTIVE))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
-
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "interactive" and
- msg["Authentication"]["status"]
- == "NT_STATUS_WRONG_PASSWORD" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ (msg["Authentication"]["authDescription"] ==
+ "interactive") and
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_WRONG_PASSWORD") and
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_INTERACTIVE))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = "badPassword"
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
-
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "interactive" and
- msg["Authentication"]["status"]
- == "NT_STATUS_NO_SUCH_USER" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = "badUser"
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ (msg["Authentication"]["authDescription"] ==
+ "interactive") and
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_NO_SUCH_USER") and
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_INTERACTIVE))
+
+ server = os.environ["SERVER"]
+ user = "badUser"
password = os.environ["PASSWORD"]
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
-
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "network" and
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ msg["Authentication"]["authDescription"] == "network" and
msg["Authentication"]["status"] == "NT_STATUS_OK" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
-
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "network" and
- msg["Authentication"]["status"]
- == "NT_STATUS_WRONG_PASSWORD" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ msg["Authentication"]["authDescription"] == "network" and
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_WRONG_PASSWORD") and
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = "badPassword"
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
-
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
- return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "network" and
- msg["Authentication"]["status"]
- == "NT_STATUS_NO_SUCH_USER" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = "badUser"
- password = os.environ["PASSWORD"]
+ def isLastExpectedMessage(msg):
+ return ((msg["type"] == "Authentication") and
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ (msg["Authentication"]["authDescription"] == "network") and
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_NO_SUCH_USER") and
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = "badUser"
+ password = os.environ["PASSWORD"]
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
-
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
- return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "network" and
- msg["Authentication"]["status"] == "NT_STATUS_OK" and
- msg["Authentication"]["passwordType"] == "MSCHAPv2" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ def isLastExpectedMessage(msg):
+ return ((msg["type"] == "Authentication") and
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ (msg["Authentication"]["authDescription"] == "network") and
+ (msg["Authentication"]["status"] == "NT_STATUS_OK") and
+ (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
- samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2)
-
+ samlogon = "samlogon %s %s %s %d 0x00010000" % (
+ user, password, workstation, 2)
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
- return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "network" and
- msg["Authentication"]["status"]
- == "NT_STATUS_WRONG_PASSWORD" and
- msg["Authentication"]["passwordType"] == "MSCHAPv2" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ def isLastExpectedMessage(msg):
+ return ((msg["type"] == "Authentication") and
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ (msg["Authentication"]["authDescription"] == "network") and
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_WRONG_PASSWORD") and
+ (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = "badPassword"
- samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2)
-
+ samlogon = "samlogon %s %s %s %d 0x00010000" % (
+ user, password, workstation, 2)
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
- return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "network" and
- msg["Authentication"]["status"]
- == "NT_STATUS_NO_SUCH_USER" and
- msg["Authentication"]["passwordType"] == "MSCHAPv2" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = "badUser"
+ def isLastExpectedMessage(msg):
+ return ((msg["type"] == "Authentication") and
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ (msg["Authentication"]["authDescription"] == "network") and
+ (msg["Authentication"]["status"] ==
+ "NT_STATUS_NO_SUCH_USER") and
+ (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = "badUser"
password = os.environ["PASSWORD"]
- samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2)
-
+ samlogon = "samlogon %s %s %s %d 0x00010000" % (
+ user, password, workstation, 2)
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
- return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "network" and
- msg["Authentication"]["status"] == "NT_STATUS_OK" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ def isLastExpectedMessage(msg):
+ return ((msg["type"] == "Authentication") and
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ (msg["Authentication"]["authDescription"] == "network") and
+ (msg["Authentication"]["status"] == "NT_STATUS_OK") and
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
-
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
self.assertEquals("Authorization", msg["type"])
self.assertEquals("DCE/RPC",
msg["Authorization"]["serviceDescription"])
- self.assertEquals("schannel", msg["Authorization"]["authType"])
+ self.assertEquals("schannel", msg["Authorization"]["authType"])
self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
+ self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
# Signed logons get promoted to sealed, this test ensures that
- # this behaviour is not removed accidently
+ # this behaviour is not removed accidentally
def test_samlogon_schannel_sign(self):
workstation = "AuthLogTests"
- def isLastExpectedMessage( msg):
- return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"]
- == "SamLogon" and
- msg["Authentication"]["authDescription"]
- == "network" and
- msg["Authentication"]["status"] == "NT_STATUS_OK" and
- msg["Authentication"]["workstation"]
- == r"\\%s" % workstation)
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ def isLastExpectedMessage(msg):
+ return ((msg["type"] == "Authentication") and
+ (msg["Authentication"]["serviceDescription"] ==
+ "SamLogon") and
+ (msg["Authentication"]["authDescription"] == "network") and
+ (msg["Authentication"]["status"] == "NT_STATUS_OK") and
+ (msg["Authentication"]["workstation"] ==
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
- samlogon = "schannelsign;samlogon %s %s %s" % (user, password, workstation)
-
+ samlogon = "schannelsign;samlogon %s %s %s" % (
+ user, password, workstation)
call(["bin/rpcclient", "-c", samlogon, "-U%", server])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
messages = self.remove_netlogon_messages(messages)
received = len(messages)
self.assertIs(True,
self.assertEquals("Authorization", msg["type"])
self.assertEquals("DCE/RPC",
msg["Authorization"]["serviceDescription"])
- self.assertEquals("schannel", msg["Authorization"]["authType"])
+ self.assertEquals("schannel", msg["Authorization"]["authType"])
self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
+ self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))