from samba import NTSTATUSError
from subprocess import call
from ldb import LdbError
+from samba.dcerpc.windows_event_ids import (
+ EVT_ID_SUCCESSFUL_LOGON,
+ EVT_ID_UNSUCCESSFUL_LOGON,
+ EVT_LOGON_NETWORK,
+ EVT_LOGON_INTERACTIVE,
+ EVT_LOGON_NETWORK_CLEAR_TEXT
+)
import re
# Turn "[foo,bar]" into a list ("foo", "bar") to test
# lambda x: x removes anything that evaluates to False,
# including empty strings, so we handle "" as well
- binding_list = list(filter(lambda x: x, re.compile('[\[,\]]').split(binding)))
+ binding_list = \
+ list(filter(lambda x: x, re.compile('[\[,\]]').split(binding)))
# Handle explicit smb2, smb1 or auto negotiation
if "smb2" in binding_list:
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
- self._assert_ncacn_np_serviceDescription(binding,
- msg["Authentication"]["serviceDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
+ self._assert_ncacn_np_serviceDescription(
+ binding, msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[1],
msg["Authentication"]["authDescription"])
# Check the second message it should be an Authorization
msg = messages[1]
self.assertEquals("Authorization", msg["type"])
- self._assert_ncacn_np_serviceDescription(binding,
- msg["Authorization"]["serviceDescription"])
+ self._assert_ncacn_np_serviceDescription(
+ binding, msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
self.assertEquals(authTypes[3],
msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
def rpc_ncacn_np_krb5_check(
self,
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[1],
msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
# Check the second message it should be an Authentication
# This this the TCP Authentication in response to the message too big
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[2],
msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
# Check the third message it should be an Authorization
msg = messages[2]
self.assertEquals("Authorization", msg["type"])
- self._assert_ncacn_np_serviceDescription(binding,
- msg["Authorization"]["serviceDescription"])
+ self._assert_ncacn_np_serviceDescription(
+ binding, msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[2],
msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
binding, protection):
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[2],
msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
# Check the third message it should be an Authentication
msg = messages[2]
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[2],
msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
self.assertEquals("ENC-TS Pre-authentication",
msg["Authentication"]["authDescription"])
self.assertTrue(msg["Authentication"]["duration"] > 0)
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
# Check the second message it should be an Authentication
msg = messages[1]
self.assertEquals("ENC-TS Pre-authentication",
msg["Authentication"]["authDescription"])
self.assertTrue(msg["Authentication"]["duration"] > 0)
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
def test_ldap_ntlm(self):
msg["Authentication"]["serviceDescription"])
self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
self.assertTrue(msg["Authentication"]["duration"] > 0)
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
def test_ldap_simple_bind(self):
def isLastExpectedMessage(msg):
msg["Authentication"]["serviceDescription"])
self.assertEquals("simple bind",
msg["Authentication"]["authDescription"])
+ self.assertEquals(
+ EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
+ self.assertEquals(
+ EVT_LOGON_NETWORK_CLEAR_TEXT, msg["Authentication"]["logonType"])
def test_ldap_simple_bind_bad_password(self):
def isLastExpectedMessage(msg):
msg["Authentication"]["serviceDescription"] == "LDAP" and
(msg["Authentication"]["status"] ==
"NT_STATUS_WRONG_PASSWORD") and
- msg["Authentication"]["authDescription"] == "simple bind")
+ (msg["Authentication"]["authDescription"] ==
+ "simple bind") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK_CLEAR_TEXT))
creds = self.insta_creds(template=self.get_credentials())
creds.set_password("badPassword")
msg["Authentication"]["serviceDescription"] == "LDAP" and
(msg["Authentication"]["status"] ==
"NT_STATUS_NO_SUCH_USER") and
- msg["Authentication"]["authDescription"] == "simple bind")
+ (msg["Authentication"]["authDescription"] ==
+ "simple bind") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK_CLEAR_TEXT))
creds = self.insta_creds(template=self.get_credentials())
creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
msg["Authentication"]["serviceDescription"] == "LDAP" and
(msg["Authentication"]["status"] ==
"NT_STATUS_NO_SUCH_USER") and
- msg["Authentication"]["authDescription"] == "simple bind")
+ (msg["Authentication"]["authDescription"] ==
+ "simple bind") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK_CLEAR_TEXT))
creds = self.insta_creds(template=self.get_credentials())
creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
def test_ldap_anonymous_access(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
- msg["Authorization"]["serviceDescription"] == "LDAP" and
+ msg["Authorization"]["serviceDescription"] == "LDAP" and
msg["Authorization"]["transportProtection"] == "TLS" and
msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
msg["Authorization"]["authType"] == "no bind")
msg["Authentication"]["serviceDescription"])
self.assertEquals("ENC-TS Pre-authentication",
msg["Authentication"]["authDescription"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
# Check the second message it should be an Authentication
msg = messages[1]
msg["Authentication"]["serviceDescription"])
self.assertEquals("ENC-TS Pre-authentication",
msg["Authentication"]["authDescription"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
def test_smb_bad_password(self):
def isLastExpectedMessage(msg):
(msg["Authentication"]["status"] ==
"NT_STATUS_NO_SUCH_USER") and
(msg["Authentication"]["authDescription"] ==
- "ENC-TS Pre-authentication"))
+ "ENC-TS Pre-authentication") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
creds = self.insta_creds(template=self.get_credentials())
creds.set_username("badUser")
msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
msg["Authorization"]["transportProtection"] == "SMB")
- server = os.environ["SERVER"]
+ server = os.environ["SERVER"]
path = "//%s/IPC$" % server
auth = "-N"
msg["Authentication"]["authDescription"])
self.assertEquals("No-Password",
msg["Authentication"]["passwordType"])
+ self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
# Check the second message it should be an Authentication
msg = messages[1]
msg["Authentication"]["passwordType"])
self.assertEquals("ANONYMOUS LOGON",
msg["Authentication"]["becameAccount"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
def test_smb2_anonymous(self):
def isLastExpectedMessage(msg):
msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
msg["Authorization"]["transportProtection"] == "SMB")
- server = os.environ["SERVER"]
+ server = os.environ["SERVER"]
path = "//%s/IPC$" % server
auth = "-N"
msg["Authentication"]["authDescription"])
self.assertEquals("No-Password",
msg["Authentication"]["passwordType"])
+ self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
# Check the second message it should be an Authentication
msg = messages[1]
msg["Authentication"]["passwordType"])
self.assertEquals("ANONYMOUS LOGON",
msg["Authentication"]["becameAccount"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
def test_smb_no_krb_spnego(self):
def isLastExpectedMessage(msg):
msg["Authentication"]["authDescription"])
self.assertEquals("NTLMv2",
msg["Authentication"]["passwordType"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
def test_smb_no_krb_spnego_bad_password(self):
def isLastExpectedMessage(msg):
msg["Authentication"]["authDescription"] == "NTLMSSP" and
msg["Authentication"]["passwordType"] == "NTLMv2" and
(msg["Authentication"]["status"] ==
- "NT_STATUS_WRONG_PASSWORD"))
+ "NT_STATUS_WRONG_PASSWORD") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
msg["Authentication"]["authDescription"] == "NTLMSSP" and
msg["Authentication"]["passwordType"] == "NTLMv2" and
(msg["Authentication"]["status"] ==
- "NT_STATUS_NO_SUCH_USER"))
+ "NT_STATUS_NO_SUCH_USER") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
msg["Authentication"]["authDescription"])
self.assertEquals("NTLMv1",
msg["Authentication"]["passwordType"])
+ self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
+ msg["Authentication"]["eventId"])
+ self.assertEquals(EVT_LOGON_NETWORK,
+ msg["Authentication"]["logonType"])
def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
def isLastExpectedMessage(msg):
msg["Authentication"]["authDescription"] == "bare-NTLM" and
msg["Authentication"]["passwordType"] == "NTLMv1" and
(msg["Authentication"]["status"] ==
- "NT_STATUS_WRONG_PASSWORD"))
+ "NT_STATUS_WRONG_PASSWORD") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
msg["Authentication"]["authDescription"] == "bare-NTLM" and
msg["Authentication"]["passwordType"] == "NTLMv1" and
(msg["Authentication"]["status"] ==
- "NT_STATUS_NO_SUCH_USER"))
+ "NT_STATUS_NO_SUCH_USER") and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
"interactive") and
msg["Authentication"]["status"] == "NT_STATUS_OK" and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_INTERACTIVE))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
(msg["Authentication"]["status"] ==
"NT_STATUS_WRONG_PASSWORD") and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_INTERACTIVE))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = "badPassword"
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
(msg["Authentication"]["status"] ==
"NT_STATUS_NO_SUCH_USER") and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = "badUser"
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_INTERACTIVE))
+
+ server = os.environ["SERVER"]
+ user = "badUser"
password = os.environ["PASSWORD"]
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
msg["Authentication"]["authDescription"] == "network" and
msg["Authentication"]["status"] == "NT_STATUS_OK" and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
(msg["Authentication"]["status"] ==
"NT_STATUS_WRONG_PASSWORD") and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = "badPassword"
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
(msg["Authentication"]["status"] ==
"NT_STATUS_NO_SUCH_USER") and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = "badUser"
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = "badUser"
password = os.environ["PASSWORD"]
samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
(msg["Authentication"]["status"] == "NT_STATUS_OK") and
(msg["Authentication"]["passwordType"] == "MSCHAPv2") and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
samlogon = "samlogon %s %s %s %d 0x00010000" % (
user, password, workstation, 2)
"NT_STATUS_WRONG_PASSWORD") and
(msg["Authentication"]["passwordType"] == "MSCHAPv2") and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = "badPassword"
samlogon = "samlogon %s %s %s %d 0x00010000" % (
user, password, workstation, 2)
"NT_STATUS_NO_SUCH_USER") and
(msg["Authentication"]["passwordType"] == "MSCHAPv2") and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = "badUser"
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_UNSUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = "badUser"
password = os.environ["PASSWORD"]
samlogon = "samlogon %s %s %s %d 0x00010000" % (
user, password, workstation, 2)
(msg["Authentication"]["authDescription"] == "network") and
(msg["Authentication"]["status"] == "NT_STATUS_OK") and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
(msg["Authentication"]["authDescription"] == "network") and
(msg["Authentication"]["status"] == "NT_STATUS_OK") and
(msg["Authentication"]["workstation"] ==
- r"\\%s" % workstation))
-
- server = os.environ["SERVER"]
- user = os.environ["USERNAME"]
+ r"\\%s" % workstation) and
+ (msg["Authentication"]["eventId"] ==
+ EVT_ID_SUCCESSFUL_LOGON) and
+ (msg["Authentication"]["logonType"] ==
+ EVT_LOGON_NETWORK))
+
+ server = os.environ["SERVER"]
+ user = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
samlogon = "schannelsign;samlogon %s %s %s" % (
user, password, workstation)