def _test_rpc_ncacn_np(self, authTypes, creds, service,
binding, protection, checkFunction):
- def isLastExpectedMessage( msg):
- return (
- msg["type"] == "Authorization" and
- ( msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
- msg["Authorization"]["serviceDescription"] == service) and
- msg["Authorization"]["authType"] == authTypes[0] and
- msg["Authorization"]["transportProtection"] == protection
- )
+ def isLastExpectedMessage(msg):
+ return (msg["type"] == "Authorization" and
+ (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
+ msg["Authorization"]["serviceDescription"] == service) and
+ msg["Authorization"]["authType"] == authTypes[0] and
+ msg["Authorization"]["transportProtection"] == protection)
if binding:
binding = "[%s]" % binding
len(messages),
"Did not receive the expected number of messages")
- # Check the first message it should be an Authentication
+ # Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
# Check the third message it should be an Authentication
# if we are expecting 4 messages
if expected_messages == 4:
- def checkServiceDescription( desc):
+ def checkServiceDescription(desc):
return (desc == "DCE/RPC" or desc == service)
msg = messages[2]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertTrue(
- checkServiceDescription( msg["Authentication"]["serviceDescription"]))
+ checkServiceDescription(msg["Authentication"]["serviceDescription"]))
self.assertEquals(authTypes[3], msg["Authentication"]["authDescription"])
len(messages),
"Did not receive the expected number of messages")
- # Check the first message it should be an Authentication
+ # Check the first message it should be an Authentication
# This is almost certainly Authentication over UDP, and is probably
# returning message too big,
msg = messages[0]
def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
binding, protection, checkFunction):
- def isLastExpectedMessage( msg):
- return (
- msg["type"] == "Authorization" and
- msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
- msg["Authorization"]["authType"] == authTypes[0] and
- msg["Authorization"]["transportProtection"] == protection
- )
+ def isLastExpectedMessage(msg):
+ return (msg["type"] == "Authorization" and
+ msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
+ msg["Authorization"]["authType"] == authTypes[0] and
+ msg["Authorization"]["transportProtection"] == protection)
if binding:
binding = "[%s]" % binding
creds)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
checkFunction(messages, authTypes, service, binding, protection)
def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
def test_ldap(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
- msg["Authorization"]["serviceDescription"] == "LDAP" and
+ msg["Authorization"]["serviceDescription"] == "LDAP" and
msg["Authorization"]["transportProtection"] == "SIGN" and
msg["Authorization"]["authType"] == "krb5")
lp = self.get_loadparm(),
credentials=self.get_credentials())
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(3,
len(messages),
"Did not receive the expected number of messages")
- # Check the first message it should be an Authentication
+ # Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("ENC-TS Pre-authentication",
msg["Authentication"]["authDescription"])
- # Check the first message it should be an Authentication
+ # Check the first message it should be an Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
def test_ldap_ntlm(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
- msg["Authorization"]["serviceDescription"] == "LDAP" and
+ msg["Authorization"]["serviceDescription"] == "LDAP" and
msg["Authorization"]["transportProtection"] == "SEAL" and
msg["Authorization"]["authType"] == "NTLMSSP")
lp = self.get_loadparm(),
credentials=self.get_credentials())
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(2,
len(messages),
"Did not receive the expected number of messages")
self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
def test_ldap_simple_bind(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
- msg["Authorization"]["serviceDescription"] == "LDAP" and
+ msg["Authorization"]["serviceDescription"] == "LDAP" and
msg["Authorization"]["transportProtection"] == "TLS" and
msg["Authorization"]["authType"] == "simple bind")
lp = self.get_loadparm(),
credentials=creds)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(2,
len(messages),
"Did not receive the expected number of messages")
- # Check the first message it should be an Authentication
+ # Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
msg["Authentication"]["authDescription"])
def test_ldap_simple_bind_bad_password(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"] == "LDAP" and
+ msg["Authentication"]["serviceDescription"] == "LDAP" and
msg["Authentication"]["status"]
== "NT_STATUS_WRONG_PASSWORD" and
msg["Authentication"]["authDescription"] == "simple bind")
creds = self.insta_creds(template=self.get_credentials())
- creds.set_password( "badPassword")
+ creds.set_password("badPassword")
creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
creds.get_username()))
credentials=creds)
except LdbError:
thrown = True
- self.assertEquals( thrown, True)
+ self.assertEquals(thrown, True)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_ldap_simple_bind_bad_user(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"] == "LDAP" and
+ msg["Authentication"]["serviceDescription"] == "LDAP" and
msg["Authentication"]["status"]
== "NT_STATUS_NO_SUCH_USER" and
msg["Authentication"]["authDescription"] == "simple bind")
credentials=creds)
except LdbError:
thrown = True
- self.assertEquals( thrown, True)
+ self.assertEquals(thrown, True)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_ldap_simple_bind_unparseable_user(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"] == "LDAP" and
+ msg["Authentication"]["serviceDescription"] == "LDAP" and
msg["Authentication"]["status"]
== "NT_STATUS_NO_SUCH_USER" and
msg["Authentication"]["authDescription"] == "simple bind")
credentials=creds)
except LdbError:
thrown = True
- self.assertEquals( thrown, True)
+ self.assertEquals(thrown, True)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
len(messages),
"Did not receive the expected number of messages")
def test_smb(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
- msg["Authorization"]["serviceDescription"] == "SMB" and
- msg["Authorization"]["authType"] == "krb5" and
+ msg["Authorization"]["serviceDescription"] == "SMB" and
+ msg["Authorization"]["authType"] == "krb5" and
msg["Authorization"]["transportProtection"] == "SMB")
creds = self.insta_creds(template=self.get_credentials())
lp=self.get_loadparm(),
creds=creds)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(3,
len(messages),
"Did not receive the expected number of messages")
- # Check the first message it should be an Authentication
+ # Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
msg["Authentication"]["authDescription"])
def test_smb_bad_password(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"]
== "Kerberos KDC" and
creds=creds)
except NTSTATUSError:
thrown = True
- self.assertEquals( thrown, True)
+ self.assertEquals(thrown, True)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_smb_bad_user(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"]
== "Kerberos KDC" and
creds=creds)
except NTSTATUSError:
thrown = True
- self.assertEquals( thrown, True)
+ self.assertEquals(thrown, True)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_smb_anonymous(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "SMB" and
- msg["Authorization"]["authType"] == "NTLMSSP" and
+ msg["Authorization"]["authType"] == "NTLMSSP" and
msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
msg["Authorization"]["transportProtection"] == "SMB")
auth = "-N"
call(["bin/smbclient", path, auth, "-c quit"])
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(3,
len(messages),
"Did not receive the expected number of messages")
self.assertEquals("NT_STATUS_NO_SUCH_USER",
msg["Authentication"]["status"])
self.assertEquals("SMB",
- msg["Authentication"]["serviceDescription"])
+ msg["Authentication"]["serviceDescription"])
self.assertEquals("NTLMSSP",
- msg["Authentication"]["authDescription"])
+ msg["Authentication"]["authDescription"])
self.assertEquals("No-Password",
- msg["Authentication"]["passwordType"])
+ msg["Authentication"]["passwordType"])
# Check the second message it should be an Authentication
msg = messages[1]
self.assertEquals("NT_STATUS_OK",
msg["Authentication"]["status"])
self.assertEquals("SMB",
- msg["Authentication"]["serviceDescription"])
+ msg["Authentication"]["serviceDescription"])
self.assertEquals("NTLMSSP",
- msg["Authentication"]["authDescription"])
+ msg["Authentication"]["authDescription"])
self.assertEquals("No-Password",
- msg["Authentication"]["passwordType"])
+ msg["Authentication"]["passwordType"])
self.assertEquals("ANONYMOUS LOGON",
- msg["Authentication"]["becameAccount"])
+ msg["Authentication"]["becameAccount"])
def test_smb_no_krb_spnego(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "SMB" and
msg["Authorization"]["authType"] == "NTLMSSP" and
lp=self.get_loadparm(),
creds=creds)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(2,
len(messages),
"Did not receive the expected number of messages")
- # Check the first message it should be an Authentication
+ # Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("SMB",
- msg["Authentication"]["serviceDescription"])
+ msg["Authentication"]["serviceDescription"])
self.assertEquals("NTLMSSP",
- msg["Authentication"]["authDescription"])
+ msg["Authentication"]["authDescription"])
self.assertEquals("NTLMv2",
- msg["Authentication"]["passwordType"])
+ msg["Authentication"]["passwordType"])
def test_smb_no_krb_spnego_bad_password(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "NTLMSSP" and
creds=creds)
except NTSTATUSError:
thrown = True
- self.assertEquals( thrown, True)
+ self.assertEquals(thrown, True)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_smb_no_krb_spnego_bad_user(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "NTLMSSP" and
creds=creds)
except NTSTATUSError:
thrown = True
- self.assertEquals( thrown, True)
+ self.assertEquals(thrown, True)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_smb_no_krb_no_spnego_no_ntlmv2(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "SMB" and
msg["Authorization"]["authType"] == "bare-NTLM" and
"sysvol",
lp=self.get_loadparm(),
creds=creds,
- ntlmv2_auth = False,
- use_spnego = False )
+ ntlmv2_auth=False,
+ use_spnego=False)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(2,
len(messages),
"Did not receive the expected number of messages")
- # Check the first message it should be an Authentication
+ # Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("SMB",
- msg["Authentication"]["serviceDescription"])
+ msg["Authentication"]["serviceDescription"])
self.assertEquals("bare-NTLM",
- msg["Authentication"]["authDescription"])
+ msg["Authentication"]["authDescription"])
self.assertEquals("NTLMv1",
- msg["Authentication"]["passwordType"])
+ msg["Authentication"]["passwordType"])
def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "bare-NTLM" and
"sysvol",
lp=self.get_loadparm(),
creds=creds,
- ntlmv2_auth = False,
- use_spnego = False )
+ ntlmv2_auth=False,
+ use_spnego=False)
except NTSTATUSError:
thrown = True
- self.assertEquals( thrown, True)
+ self.assertEquals(thrown, True)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
- def isLastExpectedMessage( msg):
+ def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "bare-NTLM" and
"sysvol",
lp=self.get_loadparm(),
creds=creds,
- ntlmv2_auth = False,
- use_spnego = False )
+ ntlmv2_auth=False,
+ use_spnego=False)
except NTSTATUSError:
thrown = True
- self.assertEquals( thrown, True)
+ self.assertEquals(thrown, True)
- messages = self.waitForMessages( isLastExpectedMessage)
+ messages = self.waitForMessages(isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")