1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for the Auth and AuthZ logging.
21 from samba.dcerpc import srvsvc, dnsserver
23 from samba.samba3 import libsmb_samba_internal as libsmb
24 from samba.samba3 import param as s3param
25 from samba.samdb import SamDB
26 import samba.tests.auth_log_base
27 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
28 from samba import NTSTATUSError
29 from subprocess import call
30 from ldb import LdbError
31 from samba.dcerpc.windows_event_ids import (
32 EVT_ID_SUCCESSFUL_LOGON,
33 EVT_ID_UNSUCCESSFUL_LOGON,
35 EVT_LOGON_INTERACTIVE,
36 EVT_LOGON_NETWORK_CLEAR_TEXT
41 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
44 super(AuthLogTests, self).setUp()
45 self.remoteAddress = os.environ["CLIENT_IP"]
48 super(AuthLogTests, self).tearDown()
50 def smb_connection(self, creds, use_spnego="yes", ntlmv2_auth="yes",
52 # the SMB bindings rely on having a s3 loadparm
53 lp = self.get_loadparm()
54 s3_lp = s3param.get_context()
55 s3_lp.load(lp.configfile)
57 # Allow the testcase to skip SPNEGO or use NTLMv1
58 s3_lp.set("client use spnego", use_spnego)
59 s3_lp.set("client ntlmv2 auth", ntlmv2_auth)
61 return libsmb.Conn(self.server, "sysvol", lp=s3_lp, creds=creds,
62 force_smb1=force_smb1)
64 def _test_rpc_ncacn_np(self, authTypes, creds, service,
65 binding, protection, checkFunction):
66 def isLastExpectedMessage(msg):
67 return (msg["type"] == "Authorization" and
68 (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
69 msg["Authorization"]["serviceDescription"] == service) and
70 msg["Authorization"]["authType"] == authTypes[0] and
71 msg["Authorization"]["transportProtection"] == protection)
74 binding = "[%s]" % binding
76 if service == "dnsserver":
77 x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
80 elif service == "srvsvc":
81 x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
85 # The connection is passed to ensure the server
86 # messaging context stays up until all the messages have been received.
87 messages = self.waitForMessages(isLastExpectedMessage, x)
88 checkFunction(messages, authTypes, service, binding, protection)
90 def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
91 # Turn "[foo,bar]" into a list ("foo", "bar") to test
92 # lambda x: x removes anything that evaluates to False,
93 # including empty strings, so we handle "" as well
95 list(filter(lambda x: x, re.compile(r'[\[,\]]').split(binding)))
97 # Handle explicit smb2, smb1 or auto negotiation
98 if "smb2" in binding_list:
99 self.assertEqual(serviceDescription, "SMB2")
100 elif "smb1" in binding_list:
101 self.assertEqual(serviceDescription, "SMB")
103 self.assertIn(serviceDescription, ["SMB", "SMB2"])
105 def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
106 binding, protection):
108 expected_messages = len(authTypes)
109 self.assertEqual(expected_messages,
111 "Did not receive the expected number of messages")
113 # Check the first message it should be an Authentication
115 self.assertEqual("Authentication", msg["type"])
116 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
118 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
120 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
121 self._assert_ncacn_np_serviceDescription(
122 binding, msg["Authentication"]["serviceDescription"])
123 self.assertEqual(authTypes[1],
124 msg["Authentication"]["authDescription"])
126 # Check the second message it should be an Authorization
128 self.assertEqual("Authorization", msg["type"])
129 self._assert_ncacn_np_serviceDescription(
130 binding, msg["Authorization"]["serviceDescription"])
131 self.assertEqual(authTypes[2], msg["Authorization"]["authType"])
132 self.assertEqual("SMB", msg["Authorization"]["transportProtection"])
133 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
135 # Check the third message it should be an Authentication
136 # if we are expecting 4 messages
137 if expected_messages == 4:
138 def checkServiceDescription(desc):
139 return (desc == "DCE/RPC" or desc == service)
142 self.assertEqual("Authentication", msg["type"])
143 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
145 checkServiceDescription(
146 msg["Authentication"]["serviceDescription"]))
148 self.assertEqual(authTypes[3],
149 msg["Authentication"]["authDescription"])
151 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
153 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
155 def rpc_ncacn_np_krb5_check(
163 expected_messages = len(authTypes)
164 self.assertEqual(expected_messages,
166 "Did not receive the expected number of messages")
168 # Check the first message it should be an Authentication
169 # This is almost certainly Authentication over UDP, and is probably
170 # returning message too big,
172 self.assertEqual("Authentication", msg["type"])
173 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
174 self.assertEqual("Kerberos KDC",
175 msg["Authentication"]["serviceDescription"])
176 self.assertEqual(authTypes[1],
177 msg["Authentication"]["authDescription"])
179 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
181 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
183 # Check the second message it should be an Authentication
184 # This this the TCP Authentication in response to the message too big
185 # response to the UDP Authentication
187 self.assertEqual("Authentication", msg["type"])
188 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
189 self.assertEqual("Kerberos KDC",
190 msg["Authentication"]["serviceDescription"])
191 self.assertEqual(authTypes[2],
192 msg["Authentication"]["authDescription"])
194 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
196 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
198 # Check the third message it should be an Authorization
200 self.assertEqual("Authorization", msg["type"])
201 self._assert_ncacn_np_serviceDescription(
202 binding, msg["Authorization"]["serviceDescription"])
203 self.assertEqual(authTypes[3], msg["Authorization"]["authType"])
204 self.assertEqual("SMB", msg["Authorization"]["transportProtection"])
205 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
207 def test_rpc_ncacn_np_ntlm_dns_sign(self):
208 creds = self.insta_creds(template=self.get_credentials(),
209 kerberos_state=DONT_USE_KERBEROS)
210 self._test_rpc_ncacn_np(["NTLMSSP",
214 creds, "dnsserver", "sign", "SIGN",
215 self.rpc_ncacn_np_ntlm_check)
217 def test_rpc_ncacn_np_ntlm_srv_sign(self):
218 creds = self.insta_creds(template=self.get_credentials(),
219 kerberos_state=DONT_USE_KERBEROS)
220 self._test_rpc_ncacn_np(["NTLMSSP",
224 creds, "srvsvc", "sign", "SIGN",
225 self.rpc_ncacn_np_ntlm_check)
227 def test_rpc_ncacn_np_ntlm_dns(self):
228 creds = self.insta_creds(template=self.get_credentials(),
229 kerberos_state=DONT_USE_KERBEROS)
230 self._test_rpc_ncacn_np(["ncacn_np",
233 creds, "dnsserver", "", "SMB",
234 self.rpc_ncacn_np_ntlm_check)
236 def test_rpc_ncacn_np_ntlm_srv(self):
237 creds = self.insta_creds(template=self.get_credentials(),
238 kerberos_state=DONT_USE_KERBEROS)
239 self._test_rpc_ncacn_np(["ncacn_np",
242 creds, "srvsvc", "", "SMB",
243 self.rpc_ncacn_np_ntlm_check)
245 def test_rpc_ncacn_np_krb_dns_sign(self):
246 creds = self.insta_creds(template=self.get_credentials(),
247 kerberos_state=MUST_USE_KERBEROS)
248 self._test_rpc_ncacn_np(["krb5",
249 "ENC-TS Pre-authentication",
250 "ENC-TS Pre-authentication",
252 creds, "dnsserver", "sign", "SIGN",
253 self.rpc_ncacn_np_krb5_check)
255 def test_rpc_ncacn_np_krb_srv_sign(self):
256 creds = self.insta_creds(template=self.get_credentials(),
257 kerberos_state=MUST_USE_KERBEROS)
258 self._test_rpc_ncacn_np(["krb5",
259 "ENC-TS Pre-authentication",
260 "ENC-TS Pre-authentication",
262 creds, "srvsvc", "sign", "SIGN",
263 self.rpc_ncacn_np_krb5_check)
265 def test_rpc_ncacn_np_krb_dns(self):
266 creds = self.insta_creds(template=self.get_credentials(),
267 kerberos_state=MUST_USE_KERBEROS)
268 self._test_rpc_ncacn_np(["ncacn_np",
269 "ENC-TS Pre-authentication",
270 "ENC-TS Pre-authentication",
272 creds, "dnsserver", "", "SMB",
273 self.rpc_ncacn_np_krb5_check)
275 def test_rpc_ncacn_np_krb_dns_smb2(self):
276 creds = self.insta_creds(template=self.get_credentials(),
277 kerberos_state=MUST_USE_KERBEROS)
278 self._test_rpc_ncacn_np(["ncacn_np",
279 "ENC-TS Pre-authentication",
280 "ENC-TS Pre-authentication",
282 creds, "dnsserver", "smb2", "SMB",
283 self.rpc_ncacn_np_krb5_check)
285 def test_rpc_ncacn_np_krb_srv(self):
286 creds = self.insta_creds(template=self.get_credentials(),
287 kerberos_state=MUST_USE_KERBEROS)
288 self._test_rpc_ncacn_np(["ncacn_np",
289 "ENC-TS Pre-authentication",
290 "ENC-TS Pre-authentication",
292 creds, "srvsvc", "", "SMB",
293 self.rpc_ncacn_np_krb5_check)
295 def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
296 binding, protection, checkFunction):
297 def isLastExpectedMessage(msg):
298 return (msg["type"] == "Authorization" and
299 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
300 msg["Authorization"]["authType"] == authTypes[0] and
301 msg["Authorization"]["transportProtection"] == protection)
304 binding = "[%s]" % binding
306 if service == "dnsserver":
307 conn = dnsserver.dnsserver(
308 "ncacn_ip_tcp:%s%s" % (self.server, binding),
311 elif service == "srvsvc":
312 conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
316 messages = self.waitForMessages(isLastExpectedMessage, conn)
317 checkFunction(messages, authTypes, service, binding, protection)
319 def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
320 binding, protection):
322 expected_messages = len(authTypes)
323 self.assertEqual(expected_messages,
325 "Did not receive the expected number of messages")
327 # Check the first message it should be an Authorization
329 self.assertEqual("Authorization", msg["type"])
330 self.assertEqual("DCE/RPC",
331 msg["Authorization"]["serviceDescription"])
332 self.assertEqual(authTypes[1], msg["Authorization"]["authType"])
333 self.assertEqual("NONE", msg["Authorization"]["transportProtection"])
334 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
336 # Check the second message it should be an Authentication
338 self.assertEqual("Authentication", msg["type"])
339 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
340 self.assertEqual("DCE/RPC",
341 msg["Authentication"]["serviceDescription"])
342 self.assertEqual(authTypes[2],
343 msg["Authentication"]["authDescription"])
345 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
347 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
349 def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
350 binding, protection):
352 expected_messages = len(authTypes)
353 self.assertEqual(expected_messages,
355 "Did not receive the expected number of messages")
357 # Check the first message it should be an Authorization
359 self.assertEqual("Authorization", msg["type"])
360 self.assertEqual("DCE/RPC",
361 msg["Authorization"]["serviceDescription"])
362 self.assertEqual(authTypes[1], msg["Authorization"]["authType"])
363 self.assertEqual("NONE", msg["Authorization"]["transportProtection"])
364 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
366 # Check the second message it should be an Authentication
368 self.assertEqual("Authentication", msg["type"])
369 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
370 self.assertEqual("Kerberos KDC",
371 msg["Authentication"]["serviceDescription"])
372 self.assertEqual(authTypes[2],
373 msg["Authentication"]["authDescription"])
375 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
377 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
379 # Check the third message it should be an Authentication
381 self.assertEqual("Authentication", msg["type"])
382 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
383 self.assertEqual("Kerberos KDC",
384 msg["Authentication"]["serviceDescription"])
385 self.assertEqual(authTypes[2],
386 msg["Authentication"]["authDescription"])
388 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
390 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
392 def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
393 creds = self.insta_creds(template=self.get_credentials(),
394 kerberos_state=DONT_USE_KERBEROS)
395 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
398 creds, "dnsserver", "sign", "SIGN",
399 self.rpc_ncacn_ip_tcp_ntlm_check)
401 def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
402 creds = self.insta_creds(template=self.get_credentials(),
403 kerberos_state=MUST_USE_KERBEROS)
404 self._test_rpc_ncacn_ip_tcp(["krb5",
406 "ENC-TS Pre-authentication",
407 "ENC-TS Pre-authentication"],
408 creds, "dnsserver", "sign", "SIGN",
409 self.rpc_ncacn_ip_tcp_krb5_check)
411 def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
412 creds = self.insta_creds(template=self.get_credentials(),
413 kerberos_state=DONT_USE_KERBEROS)
414 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
417 creds, "dnsserver", "", "SIGN",
418 self.rpc_ncacn_ip_tcp_ntlm_check)
420 def test_rpc_ncacn_ip_tcp_krb5_dns(self):
421 creds = self.insta_creds(template=self.get_credentials(),
422 kerberos_state=MUST_USE_KERBEROS)
423 self._test_rpc_ncacn_ip_tcp(["krb5",
425 "ENC-TS Pre-authentication",
426 "ENC-TS Pre-authentication"],
427 creds, "dnsserver", "", "SIGN",
428 self.rpc_ncacn_ip_tcp_krb5_check)
430 def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
431 creds = self.insta_creds(template=self.get_credentials(),
432 kerberos_state=DONT_USE_KERBEROS)
433 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
436 creds, "dnsserver", "connect", "NONE",
437 self.rpc_ncacn_ip_tcp_ntlm_check)
439 def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
440 creds = self.insta_creds(template=self.get_credentials(),
441 kerberos_state=MUST_USE_KERBEROS)
442 self._test_rpc_ncacn_ip_tcp(["krb5",
444 "ENC-TS Pre-authentication",
445 "ENC-TS Pre-authentication"],
446 creds, "dnsserver", "connect", "NONE",
447 self.rpc_ncacn_ip_tcp_krb5_check)
449 def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
450 creds = self.insta_creds(template=self.get_credentials(),
451 kerberos_state=DONT_USE_KERBEROS)
452 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
455 creds, "dnsserver", "seal", "SEAL",
456 self.rpc_ncacn_ip_tcp_ntlm_check)
458 def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
459 creds = self.insta_creds(template=self.get_credentials(),
460 kerberos_state=MUST_USE_KERBEROS)
461 self._test_rpc_ncacn_ip_tcp(["krb5",
463 "ENC-TS Pre-authentication",
464 "ENC-TS Pre-authentication"],
465 creds, "dnsserver", "seal", "SEAL",
466 self.rpc_ncacn_ip_tcp_krb5_check)
470 def isLastExpectedMessage(msg):
471 return (msg["type"] == "Authorization" and
472 msg["Authorization"]["serviceDescription"] == "LDAP" and
473 msg["Authorization"]["transportProtection"] == "SIGN" and
474 msg["Authorization"]["authType"] == "krb5")
476 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
477 lp=self.get_loadparm(),
478 credentials=self.get_credentials())
480 messages = self.waitForMessages(isLastExpectedMessage)
483 "Did not receive the expected number of messages")
485 # Check the first message it should be an Authentication
487 self.assertEqual("Authentication", msg["type"])
488 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
489 self.assertEqual("Kerberos KDC",
490 msg["Authentication"]["serviceDescription"])
491 self.assertEqual("ENC-TS Pre-authentication",
492 msg["Authentication"]["authDescription"])
493 self.assertTrue(msg["Authentication"]["duration"] > 0)
495 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
497 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
499 # Check the second message it should be an Authentication
501 self.assertEqual("Authentication", msg["type"])
502 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
503 self.assertEqual("Kerberos KDC",
504 msg["Authentication"]["serviceDescription"])
505 self.assertEqual("ENC-TS Pre-authentication",
506 msg["Authentication"]["authDescription"])
507 self.assertTrue(msg["Authentication"]["duration"] > 0)
509 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
511 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
513 def test_ldap_ntlm(self):
515 def isLastExpectedMessage(msg):
516 return (msg["type"] == "Authorization" and
517 msg["Authorization"]["serviceDescription"] == "LDAP" and
518 msg["Authorization"]["transportProtection"] == "SEAL" and
519 msg["Authorization"]["authType"] == "NTLMSSP")
521 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
522 lp=self.get_loadparm(),
523 credentials=self.get_credentials())
525 messages = self.waitForMessages(isLastExpectedMessage)
528 "Did not receive the expected number of messages")
529 # Check the first message it should be an Authentication
531 self.assertEqual("Authentication", msg["type"])
532 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
533 self.assertEqual("LDAP",
534 msg["Authentication"]["serviceDescription"])
535 self.assertEqual("NTLMSSP", msg["Authentication"]["authDescription"])
536 self.assertTrue(msg["Authentication"]["duration"] > 0)
538 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
540 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
542 def test_ldap_simple_bind(self):
543 def isLastExpectedMessage(msg):
544 return (msg["type"] == "Authorization" and
545 msg["Authorization"]["serviceDescription"] == "LDAP" and
546 msg["Authorization"]["transportProtection"] == "TLS" and
547 msg["Authorization"]["authType"] == "simple bind")
549 creds = self.insta_creds(template=self.get_credentials())
550 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
551 creds.get_username()))
553 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
554 lp=self.get_loadparm(),
557 messages = self.waitForMessages(isLastExpectedMessage)
560 "Did not receive the expected number of messages")
562 # Check the first message it should be an Authentication
564 self.assertEqual("Authentication", msg["type"])
565 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
566 self.assertEqual("LDAP",
567 msg["Authentication"]["serviceDescription"])
568 self.assertEqual("simple bind/TLS",
569 msg["Authentication"]["authDescription"])
571 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
573 EVT_LOGON_NETWORK_CLEAR_TEXT, msg["Authentication"]["logonType"])
575 def test_ldap_simple_bind_bad_password(self):
576 def isLastExpectedMessage(msg):
577 return (msg["type"] == "Authentication" and
578 msg["Authentication"]["serviceDescription"] == "LDAP" and
579 (msg["Authentication"]["status"] ==
580 "NT_STATUS_WRONG_PASSWORD") and
581 (msg["Authentication"]["authDescription"] ==
582 "simple bind/TLS") and
583 (msg["Authentication"]["eventId"] ==
584 EVT_ID_UNSUCCESSFUL_LOGON) and
585 (msg["Authentication"]["logonType"] ==
586 EVT_LOGON_NETWORK_CLEAR_TEXT))
588 creds = self.insta_creds(template=self.get_credentials())
589 creds.set_password("badPassword")
590 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
591 creds.get_username()))
595 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
596 lp=self.get_loadparm(),
600 self.assertEqual(thrown, True)
602 messages = self.waitForMessages(isLastExpectedMessage)
605 "Did not receive the expected number of messages")
607 def test_ldap_simple_bind_bad_user(self):
608 def isLastExpectedMessage(msg):
609 return (msg["type"] == "Authentication" and
610 msg["Authentication"]["serviceDescription"] == "LDAP" and
611 (msg["Authentication"]["status"] ==
612 "NT_STATUS_NO_SUCH_USER") and
613 (msg["Authentication"]["authDescription"] ==
614 "simple bind/TLS") and
615 (msg["Authentication"]["eventId"] ==
616 EVT_ID_UNSUCCESSFUL_LOGON) and
617 (msg["Authentication"]["logonType"] ==
618 EVT_LOGON_NETWORK_CLEAR_TEXT))
620 creds = self.insta_creds(template=self.get_credentials())
621 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
625 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
626 lp=self.get_loadparm(),
630 self.assertEqual(thrown, True)
632 messages = self.waitForMessages(isLastExpectedMessage)
635 "Did not receive the expected number of messages")
637 def test_ldap_simple_bind_unparseable_user(self):
638 def isLastExpectedMessage(msg):
639 return (msg["type"] == "Authentication" and
640 msg["Authentication"]["serviceDescription"] == "LDAP" and
641 (msg["Authentication"]["status"] ==
642 "NT_STATUS_NO_SUCH_USER") and
643 (msg["Authentication"]["authDescription"] ==
644 "simple bind/TLS") and
645 (msg["Authentication"]["eventId"] ==
646 EVT_ID_UNSUCCESSFUL_LOGON) and
647 (msg["Authentication"]["logonType"] ==
648 EVT_LOGON_NETWORK_CLEAR_TEXT))
650 creds = self.insta_creds(template=self.get_credentials())
651 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
655 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
656 lp=self.get_loadparm(),
660 self.assertEqual(thrown, True)
662 messages = self.waitForMessages(isLastExpectedMessage)
665 "Did not receive the expected number of messages")
668 # Note: as this test does not expect any messages it will
669 # time out in the call to self.waitForMessages.
670 # This is expected, but it will slow this test.
671 def test_ldap_anonymous_access_bind_only(self):
672 # Should be no logging for anonymous bind
673 # so receiving any message indicates a failure.
674 def isLastExpectedMessage(msg):
677 creds = self.insta_creds(template=self.get_credentials())
678 creds.set_anonymous()
680 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
681 lp=self.get_loadparm(),
684 messages = self.waitForMessages(isLastExpectedMessage)
687 "Did not receive the expected number of messages")
689 def test_ldap_anonymous_access(self):
690 def isLastExpectedMessage(msg):
691 return (msg["type"] == "Authorization" and
692 msg["Authorization"]["serviceDescription"] == "LDAP" and
693 msg["Authorization"]["transportProtection"] == "TLS" and
694 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
695 msg["Authorization"]["authType"] == "no bind")
697 creds = self.insta_creds(template=self.get_credentials())
698 creds.set_anonymous()
700 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
701 lp=self.get_loadparm(),
705 self.samdb.search(base=self.samdb.domain_dn())
706 self.fail("Expected an LdbError exception")
710 messages = self.waitForMessages(isLastExpectedMessage)
713 "Did not receive the expected number of messages")
716 def isLastExpectedMessage(msg):
717 return (msg["type"] == "Authorization" and
718 "SMB" in msg["Authorization"]["serviceDescription"] and
719 msg["Authorization"]["authType"] == "krb5" and
720 msg["Authorization"]["transportProtection"] == "SMB")
722 creds = self.insta_creds(template=self.get_credentials())
723 self.smb_connection(creds)
725 messages = self.waitForMessages(isLastExpectedMessage)
728 "Did not receive the expected number of messages")
729 # Check the first message it should be an Authentication
731 self.assertEqual("Authentication", msg["type"])
732 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
733 self.assertEqual("Kerberos KDC",
734 msg["Authentication"]["serviceDescription"])
735 self.assertEqual("ENC-TS Pre-authentication",
736 msg["Authentication"]["authDescription"])
737 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
738 msg["Authentication"]["eventId"])
739 self.assertEqual(EVT_LOGON_NETWORK,
740 msg["Authentication"]["logonType"])
742 # Check the second message it should be an Authentication
744 self.assertEqual("Authentication", msg["type"])
745 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
746 self.assertEqual("Kerberos KDC",
747 msg["Authentication"]["serviceDescription"])
748 self.assertEqual("ENC-TS Pre-authentication",
749 msg["Authentication"]["authDescription"])
750 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
751 msg["Authentication"]["eventId"])
752 self.assertEqual(EVT_LOGON_NETWORK,
753 msg["Authentication"]["logonType"])
755 def test_smb_bad_password(self):
756 def isLastExpectedMessage(msg):
757 return (msg["type"] == "Authentication" and
758 (msg["Authentication"]["serviceDescription"] ==
760 (msg["Authentication"]["status"] ==
761 "NT_STATUS_WRONG_PASSWORD") and
762 (msg["Authentication"]["authDescription"] ==
763 "ENC-TS Pre-authentication"))
765 creds = self.insta_creds(template=self.get_credentials())
766 creds.set_kerberos_state(MUST_USE_KERBEROS)
767 creds.set_password("badPassword")
771 self.smb_connection(creds)
772 except NTSTATUSError:
774 self.assertEqual(thrown, True)
776 messages = self.waitForMessages(isLastExpectedMessage)
779 "Did not receive the expected number of messages")
781 def test_smb_bad_user(self):
782 def isLastExpectedMessage(msg):
783 return (msg["type"] == "Authentication" and
784 (msg["Authentication"]["serviceDescription"] ==
786 (msg["Authentication"]["status"] ==
787 "NT_STATUS_NO_SUCH_USER") and
788 (msg["Authentication"]["authDescription"] ==
790 (msg["Authentication"]["eventId"] ==
791 EVT_ID_UNSUCCESSFUL_LOGON) and
792 (msg["Authentication"]["logonType"] ==
795 creds = self.insta_creds(template=self.get_credentials())
796 creds.set_kerberos_state(MUST_USE_KERBEROS)
797 creds.set_username("badUser")
801 self.smb_connection(creds)
802 except NTSTATUSError:
804 self.assertEqual(thrown, True)
806 messages = self.waitForMessages(isLastExpectedMessage)
809 "Did not receive the expected number of messages")
811 def test_smb1_anonymous(self):
812 def isLastExpectedMessage(msg):
813 return (msg["type"] == "Authorization" and
814 msg["Authorization"]["serviceDescription"] == "SMB" and
815 msg["Authorization"]["authType"] == "NTLMSSP" and
816 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
817 msg["Authorization"]["transportProtection"] == "SMB")
819 server = os.environ["SERVER"]
821 path = "//%s/IPC$" % server
823 call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
825 messages = self.waitForMessages(isLastExpectedMessage)
828 "Did not receive the expected number of messages")
830 # Check the first message it should be an Authentication
832 self.assertEqual("Authentication", msg["type"])
833 self.assertEqual("NT_STATUS_NO_SUCH_USER",
834 msg["Authentication"]["status"])
835 self.assertEqual("SMB",
836 msg["Authentication"]["serviceDescription"])
837 self.assertEqual("NTLMSSP",
838 msg["Authentication"]["authDescription"])
839 self.assertEqual("No-Password",
840 msg["Authentication"]["passwordType"])
841 self.assertEqual(EVT_ID_UNSUCCESSFUL_LOGON,
842 msg["Authentication"]["eventId"])
843 self.assertEqual(EVT_LOGON_NETWORK,
844 msg["Authentication"]["logonType"])
846 # Check the second message it should be an Authentication
848 self.assertEqual("Authentication", msg["type"])
849 self.assertEqual("NT_STATUS_OK",
850 msg["Authentication"]["status"])
851 self.assertEqual("SMB",
852 msg["Authentication"]["serviceDescription"])
853 self.assertEqual("NTLMSSP",
854 msg["Authentication"]["authDescription"])
855 self.assertEqual("No-Password",
856 msg["Authentication"]["passwordType"])
857 self.assertEqual("ANONYMOUS LOGON",
858 msg["Authentication"]["becameAccount"])
859 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
860 msg["Authentication"]["eventId"])
861 self.assertEqual(EVT_LOGON_NETWORK,
862 msg["Authentication"]["logonType"])
864 def test_smb2_anonymous(self):
865 def isLastExpectedMessage(msg):
866 return (msg["type"] == "Authorization" and
867 msg["Authorization"]["serviceDescription"] == "SMB2" and
868 msg["Authorization"]["authType"] == "NTLMSSP" and
869 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
870 msg["Authorization"]["transportProtection"] == "SMB")
872 server = os.environ["SERVER"]
874 path = "//%s/IPC$" % server
876 call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
878 messages = self.waitForMessages(isLastExpectedMessage)
881 "Did not receive the expected number of messages")
883 # Check the first message it should be an Authentication
885 self.assertEqual("Authentication", msg["type"])
886 self.assertEqual("NT_STATUS_NO_SUCH_USER",
887 msg["Authentication"]["status"])
888 self.assertEqual("SMB2",
889 msg["Authentication"]["serviceDescription"])
890 self.assertEqual("NTLMSSP",
891 msg["Authentication"]["authDescription"])
892 self.assertEqual("No-Password",
893 msg["Authentication"]["passwordType"])
894 self.assertEqual(EVT_ID_UNSUCCESSFUL_LOGON,
895 msg["Authentication"]["eventId"])
896 self.assertEqual(EVT_LOGON_NETWORK,
897 msg["Authentication"]["logonType"])
899 # Check the second message it should be an Authentication
901 self.assertEqual("Authentication", msg["type"])
902 self.assertEqual("NT_STATUS_OK",
903 msg["Authentication"]["status"])
904 self.assertEqual("SMB2",
905 msg["Authentication"]["serviceDescription"])
906 self.assertEqual("NTLMSSP",
907 msg["Authentication"]["authDescription"])
908 self.assertEqual("No-Password",
909 msg["Authentication"]["passwordType"])
910 self.assertEqual("ANONYMOUS LOGON",
911 msg["Authentication"]["becameAccount"])
912 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
913 msg["Authentication"]["eventId"])
914 self.assertEqual(EVT_LOGON_NETWORK,
915 msg["Authentication"]["logonType"])
917 def test_smb_no_krb_spnego(self):
918 def isLastExpectedMessage(msg):
919 return (msg["type"] == "Authorization" and
920 "SMB" in msg["Authorization"]["serviceDescription"] and
921 msg["Authorization"]["authType"] == "NTLMSSP" and
922 msg["Authorization"]["transportProtection"] == "SMB")
924 creds = self.insta_creds(template=self.get_credentials(),
925 kerberos_state=DONT_USE_KERBEROS)
926 self.smb_connection(creds)
928 messages = self.waitForMessages(isLastExpectedMessage)
931 "Did not receive the expected number of messages")
932 # Check the first message it should be an Authentication
934 self.assertEqual("Authentication", msg["type"])
935 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
936 self.assertIn(msg["Authentication"]["serviceDescription"],
938 self.assertEqual("NTLMSSP",
939 msg["Authentication"]["authDescription"])
940 self.assertEqual("NTLMv2",
941 msg["Authentication"]["passwordType"])
942 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
943 msg["Authentication"]["eventId"])
944 self.assertEqual(EVT_LOGON_NETWORK,
945 msg["Authentication"]["logonType"])
947 def test_smb_no_krb_spnego_bad_password(self):
948 def isLastExpectedMessage(msg):
949 return (msg["type"] == "Authentication" and
950 "SMB" in msg["Authentication"]["serviceDescription"] and
951 msg["Authentication"]["authDescription"] == "NTLMSSP" and
952 msg["Authentication"]["passwordType"] == "NTLMv2" and
953 (msg["Authentication"]["status"] ==
954 "NT_STATUS_WRONG_PASSWORD") and
955 (msg["Authentication"]["eventId"] ==
956 EVT_ID_UNSUCCESSFUL_LOGON) and
957 (msg["Authentication"]["logonType"] ==
960 creds = self.insta_creds(template=self.get_credentials(),
961 kerberos_state=DONT_USE_KERBEROS)
962 creds.set_password("badPassword")
966 self.smb_connection(creds)
967 except NTSTATUSError:
969 self.assertEqual(thrown, True)
971 messages = self.waitForMessages(isLastExpectedMessage)
974 "Did not receive the expected number of messages")
976 def test_smb_no_krb_spnego_bad_user(self):
977 def isLastExpectedMessage(msg):
978 return (msg["type"] == "Authentication" and
979 "SMB" in msg["Authentication"]["serviceDescription"] and
980 msg["Authentication"]["authDescription"] == "NTLMSSP" and
981 msg["Authentication"]["passwordType"] == "NTLMv2" and
982 (msg["Authentication"]["status"] ==
983 "NT_STATUS_NO_SUCH_USER") and
984 (msg["Authentication"]["eventId"] ==
985 EVT_ID_UNSUCCESSFUL_LOGON) and
986 (msg["Authentication"]["logonType"] ==
989 creds = self.insta_creds(template=self.get_credentials(),
990 kerberos_state=DONT_USE_KERBEROS)
991 creds.set_username("badUser")
995 self.smb_connection(creds)
996 except NTSTATUSError:
998 self.assertEqual(thrown, True)
1000 messages = self.waitForMessages(isLastExpectedMessage)
1003 "Did not receive the expected number of messages")
1005 def test_smb_no_krb_no_spnego_no_ntlmv2(self):
1006 def isLastExpectedMessage(msg):
1007 return (msg["type"] == "Authorization" and
1008 msg["Authorization"]["serviceDescription"] == "SMB" and
1009 msg["Authorization"]["authType"] == "bare-NTLM" and
1010 msg["Authorization"]["transportProtection"] == "SMB")
1012 creds = self.insta_creds(template=self.get_credentials(),
1013 kerberos_state=DONT_USE_KERBEROS)
1014 self.smb_connection(creds,
1019 messages = self.waitForMessages(isLastExpectedMessage)
1022 "Did not receive the expected number of messages")
1023 # Check the first message it should be an Authentication
1025 self.assertEqual("Authentication", msg["type"])
1026 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
1027 self.assertEqual("SMB",
1028 msg["Authentication"]["serviceDescription"])
1029 self.assertEqual("bare-NTLM",
1030 msg["Authentication"]["authDescription"])
1031 self.assertEqual("NTLMv1",
1032 msg["Authentication"]["passwordType"])
1033 self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
1034 msg["Authentication"]["eventId"])
1035 self.assertEqual(EVT_LOGON_NETWORK,
1036 msg["Authentication"]["logonType"])
1038 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
1039 def isLastExpectedMessage(msg):
1040 return (msg["type"] == "Authentication" and
1041 msg["Authentication"]["serviceDescription"] == "SMB" and
1042 msg["Authentication"]["authDescription"] == "bare-NTLM" and
1043 msg["Authentication"]["passwordType"] == "NTLMv1" and
1044 (msg["Authentication"]["status"] ==
1045 "NT_STATUS_WRONG_PASSWORD") and
1046 (msg["Authentication"]["eventId"] ==
1047 EVT_ID_UNSUCCESSFUL_LOGON) and
1048 (msg["Authentication"]["logonType"] ==
1051 creds = self.insta_creds(template=self.get_credentials(),
1052 kerberos_state=DONT_USE_KERBEROS)
1053 creds.set_password("badPassword")
1057 self.smb_connection(creds,
1061 except NTSTATUSError:
1063 self.assertEqual(thrown, True)
1065 messages = self.waitForMessages(isLastExpectedMessage)
1068 "Did not receive the expected number of messages")
1070 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
1071 def isLastExpectedMessage(msg):
1072 return (msg["type"] == "Authentication" and
1073 msg["Authentication"]["serviceDescription"] == "SMB" and
1074 msg["Authentication"]["authDescription"] == "bare-NTLM" and
1075 msg["Authentication"]["passwordType"] == "NTLMv1" and
1076 (msg["Authentication"]["status"] ==
1077 "NT_STATUS_NO_SUCH_USER") and
1078 (msg["Authentication"]["eventId"] ==
1079 EVT_ID_UNSUCCESSFUL_LOGON) and
1080 (msg["Authentication"]["logonType"] ==
1083 creds = self.insta_creds(template=self.get_credentials(),
1084 kerberos_state=DONT_USE_KERBEROS)
1085 creds.set_username("badUser")
1089 self.smb_connection(creds,
1093 except NTSTATUSError:
1095 self.assertEqual(thrown, True)
1097 messages = self.waitForMessages(isLastExpectedMessage)
1100 "Did not receive the expected number of messages")
1102 def test_samlogon_interactive(self):
1104 workstation = "AuthLogTests"
1106 def isLastExpectedMessage(msg):
1107 return (msg["type"] == "Authentication" and
1108 (msg["Authentication"]["serviceDescription"] ==
1110 (msg["Authentication"]["authDescription"] ==
1112 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1113 (msg["Authentication"]["workstation"] ==
1114 r"\\%s" % workstation) and
1115 (msg["Authentication"]["eventId"] ==
1116 EVT_ID_SUCCESSFUL_LOGON) and
1117 (msg["Authentication"]["logonType"] ==
1118 EVT_LOGON_INTERACTIVE))
1120 server = os.environ["SERVER"]
1121 user = os.environ["USERNAME"]
1122 password = os.environ["PASSWORD"]
1123 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1125 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1127 messages = self.waitForMessages(isLastExpectedMessage)
1128 messages = self.remove_netlogon_messages(messages)
1129 received = len(messages)
1131 (received == 4 or received == 5),
1132 "Did not receive the expected number of messages")
1134 def test_samlogon_interactive_bad_password(self):
1136 workstation = "AuthLogTests"
1138 def isLastExpectedMessage(msg):
1139 return (msg["type"] == "Authentication" and
1140 (msg["Authentication"]["serviceDescription"] ==
1142 (msg["Authentication"]["authDescription"] ==
1144 (msg["Authentication"]["status"] ==
1145 "NT_STATUS_WRONG_PASSWORD") and
1146 (msg["Authentication"]["workstation"] ==
1147 r"\\%s" % workstation) and
1148 (msg["Authentication"]["eventId"] ==
1149 EVT_ID_UNSUCCESSFUL_LOGON) and
1150 (msg["Authentication"]["logonType"] ==
1151 EVT_LOGON_INTERACTIVE))
1153 server = os.environ["SERVER"]
1154 user = os.environ["USERNAME"]
1155 password = "badPassword"
1156 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1158 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1160 messages = self.waitForMessages(isLastExpectedMessage)
1161 messages = self.remove_netlogon_messages(messages)
1162 received = len(messages)
1164 (received == 4 or received == 5),
1165 "Did not receive the expected number of messages")
1167 def test_samlogon_interactive_bad_user(self):
1169 workstation = "AuthLogTests"
1171 def isLastExpectedMessage(msg):
1172 return (msg["type"] == "Authentication" and
1173 (msg["Authentication"]["serviceDescription"] ==
1175 (msg["Authentication"]["authDescription"] ==
1177 (msg["Authentication"]["status"] ==
1178 "NT_STATUS_NO_SUCH_USER") and
1179 (msg["Authentication"]["workstation"] ==
1180 r"\\%s" % workstation) and
1181 (msg["Authentication"]["eventId"] ==
1182 EVT_ID_UNSUCCESSFUL_LOGON) and
1183 (msg["Authentication"]["logonType"] ==
1184 EVT_LOGON_INTERACTIVE))
1186 server = os.environ["SERVER"]
1188 password = os.environ["PASSWORD"]
1189 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1191 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1193 messages = self.waitForMessages(isLastExpectedMessage)
1194 messages = self.remove_netlogon_messages(messages)
1195 received = len(messages)
1197 (received == 4 or received == 5),
1198 "Did not receive the expected number of messages")
1200 def test_samlogon_network(self):
1202 workstation = "AuthLogTests"
1204 def isLastExpectedMessage(msg):
1205 return (msg["type"] == "Authentication" and
1206 (msg["Authentication"]["serviceDescription"] ==
1208 msg["Authentication"]["authDescription"] == "network" and
1209 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1210 (msg["Authentication"]["workstation"] ==
1211 r"\\%s" % workstation) and
1212 (msg["Authentication"]["eventId"] ==
1213 EVT_ID_SUCCESSFUL_LOGON) and
1214 (msg["Authentication"]["logonType"] ==
1217 server = os.environ["SERVER"]
1218 user = os.environ["USERNAME"]
1219 password = os.environ["PASSWORD"]
1220 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1222 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1224 messages = self.waitForMessages(isLastExpectedMessage)
1225 messages = self.remove_netlogon_messages(messages)
1226 received = len(messages)
1228 (received == 4 or received == 5),
1229 "Did not receive the expected number of messages")
1231 def test_samlogon_network_bad_password(self):
1233 workstation = "AuthLogTests"
1235 def isLastExpectedMessage(msg):
1236 return (msg["type"] == "Authentication" and
1237 (msg["Authentication"]["serviceDescription"] ==
1239 msg["Authentication"]["authDescription"] == "network" and
1240 (msg["Authentication"]["status"] ==
1241 "NT_STATUS_WRONG_PASSWORD") and
1242 (msg["Authentication"]["workstation"] ==
1243 r"\\%s" % workstation) and
1244 (msg["Authentication"]["eventId"] ==
1245 EVT_ID_UNSUCCESSFUL_LOGON) and
1246 (msg["Authentication"]["logonType"] ==
1249 server = os.environ["SERVER"]
1250 user = os.environ["USERNAME"]
1251 password = "badPassword"
1252 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1254 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1256 messages = self.waitForMessages(isLastExpectedMessage)
1257 messages = self.remove_netlogon_messages(messages)
1258 received = len(messages)
1260 (received == 4 or received == 5),
1261 "Did not receive the expected number of messages")
1263 def test_samlogon_network_bad_user(self):
1265 workstation = "AuthLogTests"
1267 def isLastExpectedMessage(msg):
1268 return ((msg["type"] == "Authentication") and
1269 (msg["Authentication"]["serviceDescription"] ==
1271 (msg["Authentication"]["authDescription"] == "network") and
1272 (msg["Authentication"]["status"] ==
1273 "NT_STATUS_NO_SUCH_USER") and
1274 (msg["Authentication"]["workstation"] ==
1275 r"\\%s" % workstation) and
1276 (msg["Authentication"]["eventId"] ==
1277 EVT_ID_UNSUCCESSFUL_LOGON) and
1278 (msg["Authentication"]["logonType"] ==
1281 server = os.environ["SERVER"]
1283 password = os.environ["PASSWORD"]
1284 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1286 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1288 messages = self.waitForMessages(isLastExpectedMessage)
1289 messages = self.remove_netlogon_messages(messages)
1290 received = len(messages)
1292 (received == 4 or received == 5),
1293 "Did not receive the expected number of messages")
1295 def test_samlogon_network_mschap(self):
1297 workstation = "AuthLogTests"
1299 def isLastExpectedMessage(msg):
1300 return ((msg["type"] == "Authentication") and
1301 (msg["Authentication"]["serviceDescription"] ==
1303 (msg["Authentication"]["authDescription"] == "network") and
1304 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1305 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1306 (msg["Authentication"]["workstation"] ==
1307 r"\\%s" % workstation) and
1308 (msg["Authentication"]["eventId"] ==
1309 EVT_ID_SUCCESSFUL_LOGON) and
1310 (msg["Authentication"]["logonType"] ==
1313 server = os.environ["SERVER"]
1314 user = os.environ["USERNAME"]
1315 password = os.environ["PASSWORD"]
1316 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1317 user, password, workstation, 2)
1319 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1321 messages = self.waitForMessages(isLastExpectedMessage)
1322 messages = self.remove_netlogon_messages(messages)
1323 received = len(messages)
1325 (received == 4 or received == 5),
1326 "Did not receive the expected number of messages")
1328 def test_samlogon_network_mschap_bad_password(self):
1330 workstation = "AuthLogTests"
1332 def isLastExpectedMessage(msg):
1333 return ((msg["type"] == "Authentication") and
1334 (msg["Authentication"]["serviceDescription"] ==
1336 (msg["Authentication"]["authDescription"] == "network") and
1337 (msg["Authentication"]["status"] ==
1338 "NT_STATUS_WRONG_PASSWORD") and
1339 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1340 (msg["Authentication"]["workstation"] ==
1341 r"\\%s" % workstation) and
1342 (msg["Authentication"]["eventId"] ==
1343 EVT_ID_UNSUCCESSFUL_LOGON) and
1344 (msg["Authentication"]["logonType"] ==
1347 server = os.environ["SERVER"]
1348 user = os.environ["USERNAME"]
1349 password = "badPassword"
1350 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1351 user, password, workstation, 2)
1353 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1355 messages = self.waitForMessages(isLastExpectedMessage)
1356 messages = self.remove_netlogon_messages(messages)
1357 received = len(messages)
1359 (received == 4 or received == 5),
1360 "Did not receive the expected number of messages")
1362 def test_samlogon_network_mschap_bad_user(self):
1364 workstation = "AuthLogTests"
1366 def isLastExpectedMessage(msg):
1367 return ((msg["type"] == "Authentication") and
1368 (msg["Authentication"]["serviceDescription"] ==
1370 (msg["Authentication"]["authDescription"] == "network") and
1371 (msg["Authentication"]["status"] ==
1372 "NT_STATUS_NO_SUCH_USER") and
1373 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1374 (msg["Authentication"]["workstation"] ==
1375 r"\\%s" % workstation) and
1376 (msg["Authentication"]["eventId"] ==
1377 EVT_ID_UNSUCCESSFUL_LOGON) and
1378 (msg["Authentication"]["logonType"] ==
1381 server = os.environ["SERVER"]
1383 password = os.environ["PASSWORD"]
1384 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1385 user, password, workstation, 2)
1387 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1389 messages = self.waitForMessages(isLastExpectedMessage)
1390 messages = self.remove_netlogon_messages(messages)
1391 received = len(messages)
1393 (received == 4 or received == 5),
1394 "Did not receive the expected number of messages")
1396 def test_samlogon_schannel_seal(self):
1398 workstation = "AuthLogTests"
1400 def isLastExpectedMessage(msg):
1401 return ((msg["type"] == "Authentication") and
1402 (msg["Authentication"]["serviceDescription"] ==
1404 (msg["Authentication"]["authDescription"] == "network") and
1405 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1406 (msg["Authentication"]["workstation"] ==
1407 r"\\%s" % workstation) and
1408 (msg["Authentication"]["eventId"] ==
1409 EVT_ID_SUCCESSFUL_LOGON) and
1410 (msg["Authentication"]["logonType"] ==
1413 server = os.environ["SERVER"]
1414 user = os.environ["USERNAME"]
1415 password = os.environ["PASSWORD"]
1416 samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1418 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1420 messages = self.waitForMessages(isLastExpectedMessage)
1421 messages = self.remove_netlogon_messages(messages)
1422 received = len(messages)
1424 (received == 4 or received == 5),
1425 "Did not receive the expected number of messages")
1427 # Check the second to last message it should be an Authorization
1429 self.assertEqual("Authorization", msg["type"])
1430 self.assertEqual("DCE/RPC",
1431 msg["Authorization"]["serviceDescription"])
1432 self.assertEqual("schannel", msg["Authorization"]["authType"])
1433 self.assertEqual("SEAL", msg["Authorization"]["transportProtection"])
1434 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
1436 # Signed logons get promoted to sealed, this test ensures that
1437 # this behaviour is not removed accidentally
1438 def test_samlogon_schannel_sign(self):
1440 workstation = "AuthLogTests"
1442 def isLastExpectedMessage(msg):
1443 return ((msg["type"] == "Authentication") and
1444 (msg["Authentication"]["serviceDescription"] ==
1446 (msg["Authentication"]["authDescription"] == "network") and
1447 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1448 (msg["Authentication"]["workstation"] ==
1449 r"\\%s" % workstation) and
1450 (msg["Authentication"]["eventId"] ==
1451 EVT_ID_SUCCESSFUL_LOGON) and
1452 (msg["Authentication"]["logonType"] ==
1455 server = os.environ["SERVER"]
1456 user = os.environ["USERNAME"]
1457 password = os.environ["PASSWORD"]
1458 samlogon = "schannelsign;samlogon %s %s %s" % (
1459 user, password, workstation)
1461 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1463 messages = self.waitForMessages(isLastExpectedMessage)
1464 messages = self.remove_netlogon_messages(messages)
1465 received = len(messages)
1467 (received == 4 or received == 5),
1468 "Did not receive the expected number of messages")
1470 # Check the second to last message it should be an Authorization
1472 self.assertEqual("Authorization", msg["type"])
1473 self.assertEqual("DCE/RPC",
1474 msg["Authorization"]["serviceDescription"])
1475 self.assertEqual("schannel", msg["Authorization"]["authType"])
1476 self.assertEqual("SEAL", msg["Authorization"]["transportProtection"])
1477 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))