1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging.
22 from samba.dcerpc import srvsvc, dnsserver
25 from samba.samdb import SamDB
26 import samba.tests.auth_log_base
27 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
28 from samba import NTSTATUSError
29 from subprocess import call
30 from ldb import LdbError
31 from samba.dcerpc.windows_event_ids import (
32 EVT_ID_SUCCESSFUL_LOGON,
33 EVT_ID_UNSUCCESSFUL_LOGON,
35 EVT_LOGON_INTERACTIVE,
36 EVT_LOGON_NETWORK_CLEAR_TEXT
41 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
44 super(AuthLogTests, self).setUp()
45 self.remoteAddress = os.environ["CLIENT_IP"]
48 super(AuthLogTests, self).tearDown()
50 def _test_rpc_ncacn_np(self, authTypes, creds, service,
51 binding, protection, checkFunction):
52 def isLastExpectedMessage(msg):
53 return (msg["type"] == "Authorization" and
54 (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
55 msg["Authorization"]["serviceDescription"] == service) and
56 msg["Authorization"]["authType"] == authTypes[0] and
57 msg["Authorization"]["transportProtection"] == protection)
60 binding = "[%s]" % binding
62 if service == "dnsserver":
63 x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
66 elif service == "srvsvc":
67 x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
71 # The connection is passed to ensure the server
72 # messaging context stays up until all the messages have been received.
73 messages = self.waitForMessages(isLastExpectedMessage, x)
74 checkFunction(messages, authTypes, service, binding, protection)
76 def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
77 # Turn "[foo,bar]" into a list ("foo", "bar") to test
78 # lambda x: x removes anything that evaluates to False,
79 # including empty strings, so we handle "" as well
80 binding_list = list(filter(lambda x: x, re.compile('[\[,\]]').split(binding)))
82 # Handle explicit smb2, smb1 or auto negotiation
83 if "smb2" in binding_list:
84 self.assertEquals(serviceDescription, "SMB2")
85 elif "smb1" in binding_list:
86 self.assertEquals(serviceDescription, "SMB")
88 self.assertIn(serviceDescription, ["SMB", "SMB2"])
90 def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
93 expected_messages = len(authTypes)
94 self.assertEquals(expected_messages,
96 "Did not receive the expected number of messages")
98 # Check the first message it should be an Authentication
100 self.assertEquals("Authentication", msg["type"])
101 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
103 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
105 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
106 self._assert_ncacn_np_serviceDescription(binding,
107 msg["Authentication"]["serviceDescription"])
108 self.assertEquals(authTypes[1],
109 msg["Authentication"]["authDescription"])
111 # Check the second message it should be an Authorization
113 self.assertEquals("Authorization", msg["type"])
114 self._assert_ncacn_np_serviceDescription(binding,
115 msg["Authorization"]["serviceDescription"])
116 self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
117 self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
118 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
120 # Check the third message it should be an Authentication
121 # if we are expecting 4 messages
122 if expected_messages == 4:
123 def checkServiceDescription(desc):
124 return (desc == "DCE/RPC" or desc == service)
127 self.assertEquals("Authentication", msg["type"])
128 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
130 checkServiceDescription(
131 msg["Authentication"]["serviceDescription"]))
133 self.assertEquals(authTypes[3],
134 msg["Authentication"]["authDescription"])
136 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
138 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
140 def rpc_ncacn_np_krb5_check(
148 expected_messages = len(authTypes)
149 self.assertEquals(expected_messages,
151 "Did not receive the expected number of messages")
153 # Check the first message it should be an Authentication
154 # This is almost certainly Authentication over UDP, and is probably
155 # returning message too big,
157 self.assertEquals("Authentication", msg["type"])
158 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
159 self.assertEquals("Kerberos KDC",
160 msg["Authentication"]["serviceDescription"])
161 self.assertEquals(authTypes[1],
162 msg["Authentication"]["authDescription"])
164 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
166 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
168 # Check the second message it should be an Authentication
169 # This this the TCP Authentication in response to the message too big
170 # response to the UDP Authentication
172 self.assertEquals("Authentication", msg["type"])
173 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
174 self.assertEquals("Kerberos KDC",
175 msg["Authentication"]["serviceDescription"])
176 self.assertEquals(authTypes[2],
177 msg["Authentication"]["authDescription"])
179 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
181 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
183 # Check the third message it should be an Authorization
185 self.assertEquals("Authorization", msg["type"])
186 self._assert_ncacn_np_serviceDescription(binding,
187 msg["Authorization"]["serviceDescription"])
188 self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
189 self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
190 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
192 def test_rpc_ncacn_np_ntlm_dns_sign(self):
193 creds = self.insta_creds(template=self.get_credentials(),
194 kerberos_state=DONT_USE_KERBEROS)
195 self._test_rpc_ncacn_np(["NTLMSSP",
199 creds, "dnsserver", "sign", "SIGN",
200 self.rpc_ncacn_np_ntlm_check)
202 def test_rpc_ncacn_np_ntlm_srv_sign(self):
203 creds = self.insta_creds(template=self.get_credentials(),
204 kerberos_state=DONT_USE_KERBEROS)
205 self._test_rpc_ncacn_np(["NTLMSSP",
209 creds, "srvsvc", "sign", "SIGN",
210 self.rpc_ncacn_np_ntlm_check)
212 def test_rpc_ncacn_np_ntlm_dns(self):
213 creds = self.insta_creds(template=self.get_credentials(),
214 kerberos_state=DONT_USE_KERBEROS)
215 self._test_rpc_ncacn_np(["ncacn_np",
218 creds, "dnsserver", "", "SMB",
219 self.rpc_ncacn_np_ntlm_check)
221 def test_rpc_ncacn_np_ntlm_srv(self):
222 creds = self.insta_creds(template=self.get_credentials(),
223 kerberos_state=DONT_USE_KERBEROS)
224 self._test_rpc_ncacn_np(["ncacn_np",
227 creds, "srvsvc", "", "SMB",
228 self.rpc_ncacn_np_ntlm_check)
230 def test_rpc_ncacn_np_krb_dns_sign(self):
231 creds = self.insta_creds(template=self.get_credentials(),
232 kerberos_state=MUST_USE_KERBEROS)
233 self._test_rpc_ncacn_np(["krb5",
234 "ENC-TS Pre-authentication",
235 "ENC-TS Pre-authentication",
237 creds, "dnsserver", "sign", "SIGN",
238 self.rpc_ncacn_np_krb5_check)
240 def test_rpc_ncacn_np_krb_srv_sign(self):
241 creds = self.insta_creds(template=self.get_credentials(),
242 kerberos_state=MUST_USE_KERBEROS)
243 self._test_rpc_ncacn_np(["krb5",
244 "ENC-TS Pre-authentication",
245 "ENC-TS Pre-authentication",
247 creds, "srvsvc", "sign", "SIGN",
248 self.rpc_ncacn_np_krb5_check)
250 def test_rpc_ncacn_np_krb_dns(self):
251 creds = self.insta_creds(template=self.get_credentials(),
252 kerberos_state=MUST_USE_KERBEROS)
253 self._test_rpc_ncacn_np(["ncacn_np",
254 "ENC-TS Pre-authentication",
255 "ENC-TS Pre-authentication",
257 creds, "dnsserver", "", "SMB",
258 self.rpc_ncacn_np_krb5_check)
260 def test_rpc_ncacn_np_krb_dns_smb2(self):
261 creds = self.insta_creds(template=self.get_credentials(),
262 kerberos_state=MUST_USE_KERBEROS)
263 self._test_rpc_ncacn_np(["ncacn_np",
264 "ENC-TS Pre-authentication",
265 "ENC-TS Pre-authentication",
267 creds, "dnsserver", "smb2", "SMB",
268 self.rpc_ncacn_np_krb5_check)
270 def test_rpc_ncacn_np_krb_srv(self):
271 creds = self.insta_creds(template=self.get_credentials(),
272 kerberos_state=MUST_USE_KERBEROS)
273 self._test_rpc_ncacn_np(["ncacn_np",
274 "ENC-TS Pre-authentication",
275 "ENC-TS Pre-authentication",
277 creds, "srvsvc", "", "SMB",
278 self.rpc_ncacn_np_krb5_check)
280 def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
281 binding, protection, checkFunction):
282 def isLastExpectedMessage(msg):
283 return (msg["type"] == "Authorization" and
284 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
285 msg["Authorization"]["authType"] == authTypes[0] and
286 msg["Authorization"]["transportProtection"] == protection)
289 binding = "[%s]" % binding
291 if service == "dnsserver":
292 conn = dnsserver.dnsserver(
293 "ncacn_ip_tcp:%s%s" % (self.server, binding),
296 elif service == "srvsvc":
297 conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
301 messages = self.waitForMessages(isLastExpectedMessage, conn)
302 checkFunction(messages, authTypes, service, binding, protection)
304 def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
305 binding, protection):
307 expected_messages = len(authTypes)
308 self.assertEquals(expected_messages,
310 "Did not receive the expected number of messages")
312 # Check the first message it should be an Authorization
314 self.assertEquals("Authorization", msg["type"])
315 self.assertEquals("DCE/RPC",
316 msg["Authorization"]["serviceDescription"])
317 self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
318 self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
319 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
321 # Check the second message it should be an Authentication
323 self.assertEquals("Authentication", msg["type"])
324 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
325 self.assertEquals("DCE/RPC",
326 msg["Authentication"]["serviceDescription"])
327 self.assertEquals(authTypes[2],
328 msg["Authentication"]["authDescription"])
330 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
332 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
334 def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
335 binding, protection):
337 expected_messages = len(authTypes)
338 self.assertEquals(expected_messages,
340 "Did not receive the expected number of messages")
342 # Check the first message it should be an Authorization
344 self.assertEquals("Authorization", msg["type"])
345 self.assertEquals("DCE/RPC",
346 msg["Authorization"]["serviceDescription"])
347 self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
348 self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
349 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
351 # Check the second message it should be an Authentication
353 self.assertEquals("Authentication", msg["type"])
354 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
355 self.assertEquals("Kerberos KDC",
356 msg["Authentication"]["serviceDescription"])
357 self.assertEquals(authTypes[2],
358 msg["Authentication"]["authDescription"])
360 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
362 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
364 # Check the third message it should be an Authentication
366 self.assertEquals("Authentication", msg["type"])
367 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
368 self.assertEquals("Kerberos KDC",
369 msg["Authentication"]["serviceDescription"])
370 self.assertEquals(authTypes[2],
371 msg["Authentication"]["authDescription"])
373 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
375 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
377 def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
378 creds = self.insta_creds(template=self.get_credentials(),
379 kerberos_state=DONT_USE_KERBEROS)
380 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
383 creds, "dnsserver", "sign", "SIGN",
384 self.rpc_ncacn_ip_tcp_ntlm_check)
386 def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
387 creds = self.insta_creds(template=self.get_credentials(),
388 kerberos_state=MUST_USE_KERBEROS)
389 self._test_rpc_ncacn_ip_tcp(["krb5",
391 "ENC-TS Pre-authentication",
392 "ENC-TS Pre-authentication"],
393 creds, "dnsserver", "sign", "SIGN",
394 self.rpc_ncacn_ip_tcp_krb5_check)
396 def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
397 creds = self.insta_creds(template=self.get_credentials(),
398 kerberos_state=DONT_USE_KERBEROS)
399 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
402 creds, "dnsserver", "", "SIGN",
403 self.rpc_ncacn_ip_tcp_ntlm_check)
405 def test_rpc_ncacn_ip_tcp_krb5_dns(self):
406 creds = self.insta_creds(template=self.get_credentials(),
407 kerberos_state=MUST_USE_KERBEROS)
408 self._test_rpc_ncacn_ip_tcp(["krb5",
410 "ENC-TS Pre-authentication",
411 "ENC-TS Pre-authentication"],
412 creds, "dnsserver", "", "SIGN",
413 self.rpc_ncacn_ip_tcp_krb5_check)
415 def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
416 creds = self.insta_creds(template=self.get_credentials(),
417 kerberos_state=DONT_USE_KERBEROS)
418 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
421 creds, "dnsserver", "connect", "NONE",
422 self.rpc_ncacn_ip_tcp_ntlm_check)
424 def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
425 creds = self.insta_creds(template=self.get_credentials(),
426 kerberos_state=MUST_USE_KERBEROS)
427 self._test_rpc_ncacn_ip_tcp(["krb5",
429 "ENC-TS Pre-authentication",
430 "ENC-TS Pre-authentication"],
431 creds, "dnsserver", "connect", "NONE",
432 self.rpc_ncacn_ip_tcp_krb5_check)
434 def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
435 creds = self.insta_creds(template=self.get_credentials(),
436 kerberos_state=DONT_USE_KERBEROS)
437 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
440 creds, "dnsserver", "seal", "SEAL",
441 self.rpc_ncacn_ip_tcp_ntlm_check)
443 def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
444 creds = self.insta_creds(template=self.get_credentials(),
445 kerberos_state=MUST_USE_KERBEROS)
446 self._test_rpc_ncacn_ip_tcp(["krb5",
448 "ENC-TS Pre-authentication",
449 "ENC-TS Pre-authentication"],
450 creds, "dnsserver", "seal", "SEAL",
451 self.rpc_ncacn_ip_tcp_krb5_check)
455 def isLastExpectedMessage(msg):
456 return (msg["type"] == "Authorization" and
457 msg["Authorization"]["serviceDescription"] == "LDAP" and
458 msg["Authorization"]["transportProtection"] == "SIGN" and
459 msg["Authorization"]["authType"] == "krb5")
461 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
462 lp=self.get_loadparm(),
463 credentials=self.get_credentials())
465 messages = self.waitForMessages(isLastExpectedMessage)
468 "Did not receive the expected number of messages")
470 # Check the first message it should be an Authentication
472 self.assertEquals("Authentication", msg["type"])
473 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
474 self.assertEquals("Kerberos KDC",
475 msg["Authentication"]["serviceDescription"])
476 self.assertEquals("ENC-TS Pre-authentication",
477 msg["Authentication"]["authDescription"])
478 self.assertTrue(msg["Authentication"]["duration"] > 0)
480 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
482 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
484 # Check the second message it should be an Authentication
486 self.assertEquals("Authentication", msg["type"])
487 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
488 self.assertEquals("Kerberos KDC",
489 msg["Authentication"]["serviceDescription"])
490 self.assertEquals("ENC-TS Pre-authentication",
491 msg["Authentication"]["authDescription"])
492 self.assertTrue(msg["Authentication"]["duration"] > 0)
494 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
496 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
498 def test_ldap_ntlm(self):
500 def isLastExpectedMessage(msg):
501 return (msg["type"] == "Authorization" and
502 msg["Authorization"]["serviceDescription"] == "LDAP" and
503 msg["Authorization"]["transportProtection"] == "SEAL" and
504 msg["Authorization"]["authType"] == "NTLMSSP")
506 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
507 lp=self.get_loadparm(),
508 credentials=self.get_credentials())
510 messages = self.waitForMessages(isLastExpectedMessage)
513 "Did not receive the expected number of messages")
514 # Check the first message it should be an Authentication
516 self.assertEquals("Authentication", msg["type"])
517 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
518 self.assertEquals("LDAP",
519 msg["Authentication"]["serviceDescription"])
520 self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
521 self.assertTrue(msg["Authentication"]["duration"] > 0)
523 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
525 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
527 def test_ldap_simple_bind(self):
528 def isLastExpectedMessage(msg):
529 return (msg["type"] == "Authorization" and
530 msg["Authorization"]["serviceDescription"] == "LDAP" and
531 msg["Authorization"]["transportProtection"] == "TLS" and
532 msg["Authorization"]["authType"] == "simple bind")
534 creds = self.insta_creds(template=self.get_credentials())
535 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
536 creds.get_username()))
538 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
539 lp=self.get_loadparm(),
542 messages = self.waitForMessages(isLastExpectedMessage)
545 "Did not receive the expected number of messages")
547 # Check the first message it should be an Authentication
549 self.assertEquals("Authentication", msg["type"])
550 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
551 self.assertEquals("LDAP",
552 msg["Authentication"]["serviceDescription"])
553 self.assertEquals("simple bind",
554 msg["Authentication"]["authDescription"])
556 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
558 EVT_LOGON_NETWORK_CLEAR_TEXT, msg["Authentication"]["logonType"])
560 def test_ldap_simple_bind_bad_password(self):
561 def isLastExpectedMessage(msg):
562 return (msg["type"] == "Authentication" and
563 msg["Authentication"]["serviceDescription"] == "LDAP" and
564 (msg["Authentication"]["status"] ==
565 "NT_STATUS_WRONG_PASSWORD") and
566 (msg["Authentication"]["authDescription"] ==
568 (msg["Authentication"]["eventId"] ==
569 EVT_ID_UNSUCCESSFUL_LOGON) and
570 (msg["Authentication"]["logonType"] ==
571 EVT_LOGON_NETWORK_CLEAR_TEXT))
573 creds = self.insta_creds(template=self.get_credentials())
574 creds.set_password("badPassword")
575 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
576 creds.get_username()))
580 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
581 lp=self.get_loadparm(),
585 self.assertEquals(thrown, True)
587 messages = self.waitForMessages(isLastExpectedMessage)
590 "Did not receive the expected number of messages")
592 def test_ldap_simple_bind_bad_user(self):
593 def isLastExpectedMessage(msg):
594 return (msg["type"] == "Authentication" and
595 msg["Authentication"]["serviceDescription"] == "LDAP" and
596 (msg["Authentication"]["status"] ==
597 "NT_STATUS_NO_SUCH_USER") and
598 (msg["Authentication"]["authDescription"] ==
600 (msg["Authentication"]["eventId"] ==
601 EVT_ID_UNSUCCESSFUL_LOGON) and
602 (msg["Authentication"]["logonType"] ==
603 EVT_LOGON_NETWORK_CLEAR_TEXT))
605 creds = self.insta_creds(template=self.get_credentials())
606 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
610 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
611 lp=self.get_loadparm(),
615 self.assertEquals(thrown, True)
617 messages = self.waitForMessages(isLastExpectedMessage)
620 "Did not receive the expected number of messages")
622 def test_ldap_simple_bind_unparseable_user(self):
623 def isLastExpectedMessage(msg):
624 return (msg["type"] == "Authentication" and
625 msg["Authentication"]["serviceDescription"] == "LDAP" and
626 (msg["Authentication"]["status"] ==
627 "NT_STATUS_NO_SUCH_USER") and
628 (msg["Authentication"]["authDescription"] ==
630 (msg["Authentication"]["eventId"] ==
631 EVT_ID_UNSUCCESSFUL_LOGON) and
632 (msg["Authentication"]["logonType"] ==
633 EVT_LOGON_NETWORK_CLEAR_TEXT))
635 creds = self.insta_creds(template=self.get_credentials())
636 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
640 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
641 lp=self.get_loadparm(),
645 self.assertEquals(thrown, True)
647 messages = self.waitForMessages(isLastExpectedMessage)
650 "Did not receive the expected number of messages")
653 # Note: as this test does not expect any messages it will
654 # time out in the call to self.waitForMessages.
655 # This is expected, but it will slow this test.
656 def test_ldap_anonymous_access_bind_only(self):
657 # Should be no logging for anonymous bind
658 # so receiving any message indicates a failure.
659 def isLastExpectedMessage(msg):
662 creds = self.insta_creds(template=self.get_credentials())
663 creds.set_anonymous()
665 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
666 lp=self.get_loadparm(),
669 messages = self.waitForMessages(isLastExpectedMessage)
672 "Did not receive the expected number of messages")
674 def test_ldap_anonymous_access(self):
675 def isLastExpectedMessage(msg):
676 return (msg["type"] == "Authorization" and
677 msg["Authorization"]["serviceDescription"] == "LDAP" and
678 msg["Authorization"]["transportProtection"] == "TLS" and
679 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
680 msg["Authorization"]["authType"] == "no bind")
682 creds = self.insta_creds(template=self.get_credentials())
683 creds.set_anonymous()
685 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
686 lp=self.get_loadparm(),
690 self.samdb.search(base=self.samdb.domain_dn())
691 self.fail("Expected an LdbError exception")
695 messages = self.waitForMessages(isLastExpectedMessage)
698 "Did not receive the expected number of messages")
701 def isLastExpectedMessage(msg):
702 return (msg["type"] == "Authorization" and
703 msg["Authorization"]["serviceDescription"] == "SMB" and
704 msg["Authorization"]["authType"] == "krb5" and
705 msg["Authorization"]["transportProtection"] == "SMB")
707 creds = self.insta_creds(template=self.get_credentials())
710 lp=self.get_loadparm(),
713 messages = self.waitForMessages(isLastExpectedMessage)
716 "Did not receive the expected number of messages")
717 # Check the first message it should be an Authentication
719 self.assertEquals("Authentication", msg["type"])
720 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
721 self.assertEquals("Kerberos KDC",
722 msg["Authentication"]["serviceDescription"])
723 self.assertEquals("ENC-TS Pre-authentication",
724 msg["Authentication"]["authDescription"])
725 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
726 msg["Authentication"]["eventId"])
727 self.assertEquals(EVT_LOGON_NETWORK,
728 msg["Authentication"]["logonType"])
730 # Check the second message it should be an Authentication
732 self.assertEquals("Authentication", msg["type"])
733 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
734 self.assertEquals("Kerberos KDC",
735 msg["Authentication"]["serviceDescription"])
736 self.assertEquals("ENC-TS Pre-authentication",
737 msg["Authentication"]["authDescription"])
738 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
739 msg["Authentication"]["eventId"])
740 self.assertEquals(EVT_LOGON_NETWORK,
741 msg["Authentication"]["logonType"])
743 def test_smb_bad_password(self):
744 def isLastExpectedMessage(msg):
745 return (msg["type"] == "Authentication" and
746 (msg["Authentication"]["serviceDescription"] ==
748 (msg["Authentication"]["status"] ==
749 "NT_STATUS_WRONG_PASSWORD") and
750 (msg["Authentication"]["authDescription"] ==
751 "ENC-TS Pre-authentication"))
753 creds = self.insta_creds(template=self.get_credentials())
754 creds.set_password("badPassword")
760 lp=self.get_loadparm(),
762 except NTSTATUSError:
764 self.assertEquals(thrown, True)
766 messages = self.waitForMessages(isLastExpectedMessage)
769 "Did not receive the expected number of messages")
771 def test_smb_bad_user(self):
772 def isLastExpectedMessage(msg):
773 return (msg["type"] == "Authentication" and
774 (msg["Authentication"]["serviceDescription"] ==
776 (msg["Authentication"]["status"] ==
777 "NT_STATUS_NO_SUCH_USER") and
778 (msg["Authentication"]["authDescription"] ==
779 "ENC-TS Pre-authentication") and
780 (msg["Authentication"]["eventId"] ==
781 EVT_ID_UNSUCCESSFUL_LOGON) and
782 (msg["Authentication"]["logonType"] ==
785 creds = self.insta_creds(template=self.get_credentials())
786 creds.set_username("badUser")
792 lp=self.get_loadparm(),
794 except NTSTATUSError:
796 self.assertEquals(thrown, True)
798 messages = self.waitForMessages(isLastExpectedMessage)
801 "Did not receive the expected number of messages")
803 def test_smb1_anonymous(self):
804 def isLastExpectedMessage(msg):
805 return (msg["type"] == "Authorization" and
806 msg["Authorization"]["serviceDescription"] == "SMB" and
807 msg["Authorization"]["authType"] == "NTLMSSP" and
808 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
809 msg["Authorization"]["transportProtection"] == "SMB")
811 server = os.environ["SERVER"]
813 path = "//%s/IPC$" % server
815 call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
817 messages = self.waitForMessages(isLastExpectedMessage)
820 "Did not receive the expected number of messages")
822 # Check the first message it should be an Authentication
824 self.assertEquals("Authentication", msg["type"])
825 self.assertEquals("NT_STATUS_NO_SUCH_USER",
826 msg["Authentication"]["status"])
827 self.assertEquals("SMB",
828 msg["Authentication"]["serviceDescription"])
829 self.assertEquals("NTLMSSP",
830 msg["Authentication"]["authDescription"])
831 self.assertEquals("No-Password",
832 msg["Authentication"]["passwordType"])
833 self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
834 msg["Authentication"]["eventId"])
835 self.assertEquals(EVT_LOGON_NETWORK,
836 msg["Authentication"]["logonType"])
838 # Check the second message it should be an Authentication
840 self.assertEquals("Authentication", msg["type"])
841 self.assertEquals("NT_STATUS_OK",
842 msg["Authentication"]["status"])
843 self.assertEquals("SMB",
844 msg["Authentication"]["serviceDescription"])
845 self.assertEquals("NTLMSSP",
846 msg["Authentication"]["authDescription"])
847 self.assertEquals("No-Password",
848 msg["Authentication"]["passwordType"])
849 self.assertEquals("ANONYMOUS LOGON",
850 msg["Authentication"]["becameAccount"])
851 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
852 msg["Authentication"]["eventId"])
853 self.assertEquals(EVT_LOGON_NETWORK,
854 msg["Authentication"]["logonType"])
856 def test_smb2_anonymous(self):
857 def isLastExpectedMessage(msg):
858 return (msg["type"] == "Authorization" and
859 msg["Authorization"]["serviceDescription"] == "SMB2" and
860 msg["Authorization"]["authType"] == "NTLMSSP" and
861 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
862 msg["Authorization"]["transportProtection"] == "SMB")
864 server = os.environ["SERVER"]
866 path = "//%s/IPC$" % server
868 call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
870 messages = self.waitForMessages(isLastExpectedMessage)
873 "Did not receive the expected number of messages")
875 # Check the first message it should be an Authentication
877 self.assertEquals("Authentication", msg["type"])
878 self.assertEquals("NT_STATUS_NO_SUCH_USER",
879 msg["Authentication"]["status"])
880 self.assertEquals("SMB2",
881 msg["Authentication"]["serviceDescription"])
882 self.assertEquals("NTLMSSP",
883 msg["Authentication"]["authDescription"])
884 self.assertEquals("No-Password",
885 msg["Authentication"]["passwordType"])
886 self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
887 msg["Authentication"]["eventId"])
888 self.assertEquals(EVT_LOGON_NETWORK,
889 msg["Authentication"]["logonType"])
891 # Check the second message it should be an Authentication
893 self.assertEquals("Authentication", msg["type"])
894 self.assertEquals("NT_STATUS_OK",
895 msg["Authentication"]["status"])
896 self.assertEquals("SMB2",
897 msg["Authentication"]["serviceDescription"])
898 self.assertEquals("NTLMSSP",
899 msg["Authentication"]["authDescription"])
900 self.assertEquals("No-Password",
901 msg["Authentication"]["passwordType"])
902 self.assertEquals("ANONYMOUS LOGON",
903 msg["Authentication"]["becameAccount"])
904 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
905 msg["Authentication"]["eventId"])
906 self.assertEquals(EVT_LOGON_NETWORK,
907 msg["Authentication"]["logonType"])
909 def test_smb_no_krb_spnego(self):
910 def isLastExpectedMessage(msg):
911 return (msg["type"] == "Authorization" and
912 msg["Authorization"]["serviceDescription"] == "SMB" and
913 msg["Authorization"]["authType"] == "NTLMSSP" and
914 msg["Authorization"]["transportProtection"] == "SMB")
916 creds = self.insta_creds(template=self.get_credentials(),
917 kerberos_state=DONT_USE_KERBEROS)
920 lp=self.get_loadparm(),
923 messages = self.waitForMessages(isLastExpectedMessage)
926 "Did not receive the expected number of messages")
927 # Check the first message it should be an Authentication
929 self.assertEquals("Authentication", msg["type"])
930 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
931 self.assertEquals("SMB",
932 msg["Authentication"]["serviceDescription"])
933 self.assertEquals("NTLMSSP",
934 msg["Authentication"]["authDescription"])
935 self.assertEquals("NTLMv2",
936 msg["Authentication"]["passwordType"])
937 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
938 msg["Authentication"]["eventId"])
939 self.assertEquals(EVT_LOGON_NETWORK,
940 msg["Authentication"]["logonType"])
942 def test_smb_no_krb_spnego_bad_password(self):
943 def isLastExpectedMessage(msg):
944 return (msg["type"] == "Authentication" and
945 msg["Authentication"]["serviceDescription"] == "SMB" and
946 msg["Authentication"]["authDescription"] == "NTLMSSP" and
947 msg["Authentication"]["passwordType"] == "NTLMv2" and
948 (msg["Authentication"]["status"] ==
949 "NT_STATUS_WRONG_PASSWORD") and
950 (msg["Authentication"]["eventId"] ==
951 EVT_ID_UNSUCCESSFUL_LOGON) and
952 (msg["Authentication"]["logonType"] ==
955 creds = self.insta_creds(template=self.get_credentials(),
956 kerberos_state=DONT_USE_KERBEROS)
957 creds.set_password("badPassword")
963 lp=self.get_loadparm(),
965 except NTSTATUSError:
967 self.assertEquals(thrown, True)
969 messages = self.waitForMessages(isLastExpectedMessage)
972 "Did not receive the expected number of messages")
974 def test_smb_no_krb_spnego_bad_user(self):
975 def isLastExpectedMessage(msg):
976 return (msg["type"] == "Authentication" and
977 msg["Authentication"]["serviceDescription"] == "SMB" and
978 msg["Authentication"]["authDescription"] == "NTLMSSP" and
979 msg["Authentication"]["passwordType"] == "NTLMv2" and
980 (msg["Authentication"]["status"] ==
981 "NT_STATUS_NO_SUCH_USER") and
982 (msg["Authentication"]["eventId"] ==
983 EVT_ID_UNSUCCESSFUL_LOGON) and
984 (msg["Authentication"]["logonType"] ==
987 creds = self.insta_creds(template=self.get_credentials(),
988 kerberos_state=DONT_USE_KERBEROS)
989 creds.set_username("badUser")
995 lp=self.get_loadparm(),
997 except NTSTATUSError:
999 self.assertEquals(thrown, True)
1001 messages = self.waitForMessages(isLastExpectedMessage)
1002 self.assertEquals(1,
1004 "Did not receive the expected number of messages")
1006 def test_smb_no_krb_no_spnego_no_ntlmv2(self):
1007 def isLastExpectedMessage(msg):
1008 return (msg["type"] == "Authorization" and
1009 msg["Authorization"]["serviceDescription"] == "SMB" and
1010 msg["Authorization"]["authType"] == "bare-NTLM" and
1011 msg["Authorization"]["transportProtection"] == "SMB")
1013 creds = self.insta_creds(template=self.get_credentials(),
1014 kerberos_state=DONT_USE_KERBEROS)
1015 smb.SMB(self.server,
1017 lp=self.get_loadparm(),
1022 messages = self.waitForMessages(isLastExpectedMessage)
1023 self.assertEquals(2,
1025 "Did not receive the expected number of messages")
1026 # Check the first message it should be an Authentication
1028 self.assertEquals("Authentication", msg["type"])
1029 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
1030 self.assertEquals("SMB",
1031 msg["Authentication"]["serviceDescription"])
1032 self.assertEquals("bare-NTLM",
1033 msg["Authentication"]["authDescription"])
1034 self.assertEquals("NTLMv1",
1035 msg["Authentication"]["passwordType"])
1036 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
1037 msg["Authentication"]["eventId"])
1038 self.assertEquals(EVT_LOGON_NETWORK,
1039 msg["Authentication"]["logonType"])
1041 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
1042 def isLastExpectedMessage(msg):
1043 return (msg["type"] == "Authentication" and
1044 msg["Authentication"]["serviceDescription"] == "SMB" and
1045 msg["Authentication"]["authDescription"] == "bare-NTLM" and
1046 msg["Authentication"]["passwordType"] == "NTLMv1" and
1047 (msg["Authentication"]["status"] ==
1048 "NT_STATUS_WRONG_PASSWORD") and
1049 (msg["Authentication"]["eventId"] ==
1050 EVT_ID_UNSUCCESSFUL_LOGON) and
1051 (msg["Authentication"]["logonType"] ==
1054 creds = self.insta_creds(template=self.get_credentials(),
1055 kerberos_state=DONT_USE_KERBEROS)
1056 creds.set_password("badPassword")
1060 smb.SMB(self.server,
1062 lp=self.get_loadparm(),
1066 except NTSTATUSError:
1068 self.assertEquals(thrown, True)
1070 messages = self.waitForMessages(isLastExpectedMessage)
1071 self.assertEquals(1,
1073 "Did not receive the expected number of messages")
1075 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
1076 def isLastExpectedMessage(msg):
1077 return (msg["type"] == "Authentication" and
1078 msg["Authentication"]["serviceDescription"] == "SMB" and
1079 msg["Authentication"]["authDescription"] == "bare-NTLM" and
1080 msg["Authentication"]["passwordType"] == "NTLMv1" and
1081 (msg["Authentication"]["status"] ==
1082 "NT_STATUS_NO_SUCH_USER") and
1083 (msg["Authentication"]["eventId"] ==
1084 EVT_ID_UNSUCCESSFUL_LOGON) and
1085 (msg["Authentication"]["logonType"] ==
1088 creds = self.insta_creds(template=self.get_credentials(),
1089 kerberos_state=DONT_USE_KERBEROS)
1090 creds.set_username("badUser")
1094 smb.SMB(self.server,
1096 lp=self.get_loadparm(),
1100 except NTSTATUSError:
1102 self.assertEquals(thrown, True)
1104 messages = self.waitForMessages(isLastExpectedMessage)
1105 self.assertEquals(1,
1107 "Did not receive the expected number of messages")
1109 def test_samlogon_interactive(self):
1111 workstation = "AuthLogTests"
1113 def isLastExpectedMessage(msg):
1114 return (msg["type"] == "Authentication" and
1115 (msg["Authentication"]["serviceDescription"] ==
1117 (msg["Authentication"]["authDescription"] ==
1119 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1120 (msg["Authentication"]["workstation"] ==
1121 r"\\%s" % workstation) and
1122 (msg["Authentication"]["eventId"] ==
1123 EVT_ID_SUCCESSFUL_LOGON) and
1124 (msg["Authentication"]["logonType"] ==
1125 EVT_LOGON_INTERACTIVE))
1127 server = os.environ["SERVER"]
1128 user = os.environ["USERNAME"]
1129 password = os.environ["PASSWORD"]
1130 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1132 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1134 messages = self.waitForMessages(isLastExpectedMessage)
1135 messages = self.remove_netlogon_messages(messages)
1136 received = len(messages)
1138 (received == 5 or received == 6),
1139 "Did not receive the expected number of messages")
1141 def test_samlogon_interactive_bad_password(self):
1143 workstation = "AuthLogTests"
1145 def isLastExpectedMessage(msg):
1146 return (msg["type"] == "Authentication" and
1147 (msg["Authentication"]["serviceDescription"] ==
1149 (msg["Authentication"]["authDescription"] ==
1151 (msg["Authentication"]["status"] ==
1152 "NT_STATUS_WRONG_PASSWORD") and
1153 (msg["Authentication"]["workstation"] ==
1154 r"\\%s" % workstation) and
1155 (msg["Authentication"]["eventId"] ==
1156 EVT_ID_UNSUCCESSFUL_LOGON) and
1157 (msg["Authentication"]["logonType"] ==
1158 EVT_LOGON_INTERACTIVE))
1160 server = os.environ["SERVER"]
1161 user = os.environ["USERNAME"]
1162 password = "badPassword"
1163 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1165 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1167 messages = self.waitForMessages(isLastExpectedMessage)
1168 messages = self.remove_netlogon_messages(messages)
1169 received = len(messages)
1171 (received == 5 or received == 6),
1172 "Did not receive the expected number of messages")
1174 def test_samlogon_interactive_bad_user(self):
1176 workstation = "AuthLogTests"
1178 def isLastExpectedMessage(msg):
1179 return (msg["type"] == "Authentication" and
1180 (msg["Authentication"]["serviceDescription"] ==
1182 (msg["Authentication"]["authDescription"] ==
1184 (msg["Authentication"]["status"] ==
1185 "NT_STATUS_NO_SUCH_USER") and
1186 (msg["Authentication"]["workstation"] ==
1187 r"\\%s" % workstation) and
1188 (msg["Authentication"]["eventId"] ==
1189 EVT_ID_UNSUCCESSFUL_LOGON) and
1190 (msg["Authentication"]["logonType"] ==
1191 EVT_LOGON_INTERACTIVE))
1193 server = os.environ["SERVER"]
1195 password = os.environ["PASSWORD"]
1196 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1198 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1200 messages = self.waitForMessages(isLastExpectedMessage)
1201 messages = self.remove_netlogon_messages(messages)
1202 received = len(messages)
1204 (received == 5 or received == 6),
1205 "Did not receive the expected number of messages")
1207 def test_samlogon_network(self):
1209 workstation = "AuthLogTests"
1211 def isLastExpectedMessage(msg):
1212 return (msg["type"] == "Authentication" and
1213 (msg["Authentication"]["serviceDescription"] ==
1215 msg["Authentication"]["authDescription"] == "network" and
1216 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1217 (msg["Authentication"]["workstation"] ==
1218 r"\\%s" % workstation) and
1219 (msg["Authentication"]["eventId"] ==
1220 EVT_ID_SUCCESSFUL_LOGON) and
1221 (msg["Authentication"]["logonType"] ==
1224 server = os.environ["SERVER"]
1225 user = os.environ["USERNAME"]
1226 password = os.environ["PASSWORD"]
1227 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1229 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1231 messages = self.waitForMessages(isLastExpectedMessage)
1232 messages = self.remove_netlogon_messages(messages)
1233 received = len(messages)
1235 (received == 5 or received == 6),
1236 "Did not receive the expected number of messages")
1238 def test_samlogon_network_bad_password(self):
1240 workstation = "AuthLogTests"
1242 def isLastExpectedMessage(msg):
1243 return (msg["type"] == "Authentication" and
1244 (msg["Authentication"]["serviceDescription"] ==
1246 msg["Authentication"]["authDescription"] == "network" and
1247 (msg["Authentication"]["status"] ==
1248 "NT_STATUS_WRONG_PASSWORD") and
1249 (msg["Authentication"]["workstation"] ==
1250 r"\\%s" % workstation) and
1251 (msg["Authentication"]["eventId"] ==
1252 EVT_ID_UNSUCCESSFUL_LOGON) and
1253 (msg["Authentication"]["logonType"] ==
1256 server = os.environ["SERVER"]
1257 user = os.environ["USERNAME"]
1258 password = "badPassword"
1259 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1261 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1263 messages = self.waitForMessages(isLastExpectedMessage)
1264 messages = self.remove_netlogon_messages(messages)
1265 received = len(messages)
1267 (received == 5 or received == 6),
1268 "Did not receive the expected number of messages")
1270 def test_samlogon_network_bad_user(self):
1272 workstation = "AuthLogTests"
1274 def isLastExpectedMessage(msg):
1275 return ((msg["type"] == "Authentication") and
1276 (msg["Authentication"]["serviceDescription"] ==
1278 (msg["Authentication"]["authDescription"] == "network") and
1279 (msg["Authentication"]["status"] ==
1280 "NT_STATUS_NO_SUCH_USER") and
1281 (msg["Authentication"]["workstation"] ==
1282 r"\\%s" % workstation) and
1283 (msg["Authentication"]["eventId"] ==
1284 EVT_ID_UNSUCCESSFUL_LOGON) and
1285 (msg["Authentication"]["logonType"] ==
1288 server = os.environ["SERVER"]
1290 password = os.environ["PASSWORD"]
1291 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1293 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1295 messages = self.waitForMessages(isLastExpectedMessage)
1296 messages = self.remove_netlogon_messages(messages)
1297 received = len(messages)
1299 (received == 5 or received == 6),
1300 "Did not receive the expected number of messages")
1302 def test_samlogon_network_mschap(self):
1304 workstation = "AuthLogTests"
1306 def isLastExpectedMessage(msg):
1307 return ((msg["type"] == "Authentication") and
1308 (msg["Authentication"]["serviceDescription"] ==
1310 (msg["Authentication"]["authDescription"] == "network") and
1311 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1312 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1313 (msg["Authentication"]["workstation"] ==
1314 r"\\%s" % workstation) and
1315 (msg["Authentication"]["eventId"] ==
1316 EVT_ID_SUCCESSFUL_LOGON) and
1317 (msg["Authentication"]["logonType"] ==
1320 server = os.environ["SERVER"]
1321 user = os.environ["USERNAME"]
1322 password = os.environ["PASSWORD"]
1323 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1324 user, password, workstation, 2)
1326 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1328 messages = self.waitForMessages(isLastExpectedMessage)
1329 messages = self.remove_netlogon_messages(messages)
1330 received = len(messages)
1332 (received == 5 or received == 6),
1333 "Did not receive the expected number of messages")
1335 def test_samlogon_network_mschap_bad_password(self):
1337 workstation = "AuthLogTests"
1339 def isLastExpectedMessage(msg):
1340 return ((msg["type"] == "Authentication") and
1341 (msg["Authentication"]["serviceDescription"] ==
1343 (msg["Authentication"]["authDescription"] == "network") and
1344 (msg["Authentication"]["status"] ==
1345 "NT_STATUS_WRONG_PASSWORD") and
1346 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1347 (msg["Authentication"]["workstation"] ==
1348 r"\\%s" % workstation) and
1349 (msg["Authentication"]["eventId"] ==
1350 EVT_ID_UNSUCCESSFUL_LOGON) and
1351 (msg["Authentication"]["logonType"] ==
1354 server = os.environ["SERVER"]
1355 user = os.environ["USERNAME"]
1356 password = "badPassword"
1357 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1358 user, password, workstation, 2)
1360 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1362 messages = self.waitForMessages(isLastExpectedMessage)
1363 messages = self.remove_netlogon_messages(messages)
1364 received = len(messages)
1366 (received == 5 or received == 6),
1367 "Did not receive the expected number of messages")
1369 def test_samlogon_network_mschap_bad_user(self):
1371 workstation = "AuthLogTests"
1373 def isLastExpectedMessage(msg):
1374 return ((msg["type"] == "Authentication") and
1375 (msg["Authentication"]["serviceDescription"] ==
1377 (msg["Authentication"]["authDescription"] == "network") and
1378 (msg["Authentication"]["status"] ==
1379 "NT_STATUS_NO_SUCH_USER") and
1380 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1381 (msg["Authentication"]["workstation"] ==
1382 r"\\%s" % workstation) and
1383 (msg["Authentication"]["eventId"] ==
1384 EVT_ID_UNSUCCESSFUL_LOGON) and
1385 (msg["Authentication"]["logonType"] ==
1388 server = os.environ["SERVER"]
1390 password = os.environ["PASSWORD"]
1391 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1392 user, password, workstation, 2)
1394 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1396 messages = self.waitForMessages(isLastExpectedMessage)
1397 messages = self.remove_netlogon_messages(messages)
1398 received = len(messages)
1400 (received == 5 or received == 6),
1401 "Did not receive the expected number of messages")
1403 def test_samlogon_schannel_seal(self):
1405 workstation = "AuthLogTests"
1407 def isLastExpectedMessage(msg):
1408 return ((msg["type"] == "Authentication") and
1409 (msg["Authentication"]["serviceDescription"] ==
1411 (msg["Authentication"]["authDescription"] == "network") and
1412 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1413 (msg["Authentication"]["workstation"] ==
1414 r"\\%s" % workstation) and
1415 (msg["Authentication"]["eventId"] ==
1416 EVT_ID_SUCCESSFUL_LOGON) and
1417 (msg["Authentication"]["logonType"] ==
1420 server = os.environ["SERVER"]
1421 user = os.environ["USERNAME"]
1422 password = os.environ["PASSWORD"]
1423 samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1425 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1427 messages = self.waitForMessages(isLastExpectedMessage)
1428 messages = self.remove_netlogon_messages(messages)
1429 received = len(messages)
1431 (received == 5 or received == 6),
1432 "Did not receive the expected number of messages")
1434 # Check the second to last message it should be an Authorization
1436 self.assertEquals("Authorization", msg["type"])
1437 self.assertEquals("DCE/RPC",
1438 msg["Authorization"]["serviceDescription"])
1439 self.assertEquals("schannel", msg["Authorization"]["authType"])
1440 self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1441 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
1443 # Signed logons get promoted to sealed, this test ensures that
1444 # this behaviour is not removed accidentally
1445 def test_samlogon_schannel_sign(self):
1447 workstation = "AuthLogTests"
1449 def isLastExpectedMessage(msg):
1450 return ((msg["type"] == "Authentication") and
1451 (msg["Authentication"]["serviceDescription"] ==
1453 (msg["Authentication"]["authDescription"] == "network") and
1454 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1455 (msg["Authentication"]["workstation"] ==
1456 r"\\%s" % workstation) and
1457 (msg["Authentication"]["eventId"] ==
1458 EVT_ID_SUCCESSFUL_LOGON) and
1459 (msg["Authentication"]["logonType"] ==
1462 server = os.environ["SERVER"]
1463 user = os.environ["USERNAME"]
1464 password = os.environ["PASSWORD"]
1465 samlogon = "schannelsign;samlogon %s %s %s" % (
1466 user, password, workstation)
1468 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1470 messages = self.waitForMessages(isLastExpectedMessage)
1471 messages = self.remove_netlogon_messages(messages)
1472 received = len(messages)
1474 (received == 5 or received == 6),
1475 "Did not receive the expected number of messages")
1477 # Check the second to last message it should be an Authorization
1479 self.assertEquals("Authorization", msg["type"])
1480 self.assertEquals("DCE/RPC",
1481 msg["Authorization"]["serviceDescription"])
1482 self.assertEquals("schannel", msg["Authorization"]["authType"])
1483 self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1484 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))