1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for the Auth and AuthZ logging.
21 from samba import auth
23 from samba.messaging import Messaging
24 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
25 from samba.dcerpc import srvsvc, dnsserver
30 from samba.samdb import SamDB
31 import samba.tests.auth_log_base
32 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
33 from samba import NTSTATUSError
34 from subprocess import call
35 from ldb import LdbError
37 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
40 super(AuthLogTests, self).setUp()
41 self.remoteAddress = os.environ["CLIENT_IP"]
44 super(AuthLogTests, self).tearDown()
48 def _test_rpc_ncacn_np(self, authTypes, creds, service,
49 binding, protection, checkFunction):
50 def isLastExpectedMessage(msg):
51 return (msg["type"] == "Authorization" and
52 (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
53 msg["Authorization"]["serviceDescription"] == service) and
54 msg["Authorization"]["authType"] == authTypes[0] and
55 msg["Authorization"]["transportProtection"] == protection)
58 binding = "[%s]" % binding
60 if service == "dnsserver":
61 x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
64 elif service == "srvsvc":
65 x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
69 # The connection is passed to ensure the server
70 # messaging context stays up until all the messages have been received.
71 messages = self.waitForMessages(isLastExpectedMessage, x)
72 checkFunction(messages, authTypes, service, binding, protection)
74 def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
77 expected_messages = len(authTypes)
78 self.assertEquals(expected_messages,
80 "Did not receive the expected number of messages")
82 # Check the first message it should be an Authentication
84 self.assertEquals("Authentication", msg["type"])
85 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
86 self.assertEquals("SMB",
87 msg["Authentication"]["serviceDescription"])
88 self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
90 # Check the second message it should be an Authorization
92 self.assertEquals("Authorization", msg["type"])
93 self.assertEquals("SMB",
94 msg["Authorization"]["serviceDescription"])
95 self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
96 self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
98 # Check the third message it should be an Authentication
99 # if we are expecting 4 messages
100 if expected_messages == 4:
101 def checkServiceDescription(desc):
102 return (desc == "DCE/RPC" or desc == service)
105 self.assertEquals("Authentication", msg["type"])
106 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
108 checkServiceDescription(msg["Authentication"]["serviceDescription"]))
110 self.assertEquals(authTypes[3], msg["Authentication"]["authDescription"])
112 def rpc_ncacn_np_krb5_check(self, messages, authTypes, service, binding, protection):
114 expected_messages = len(authTypes)
115 self.assertEquals(expected_messages,
117 "Did not receive the expected number of messages")
119 # Check the first message it should be an Authentication
120 # This is almost certainly Authentication over UDP, and is probably
121 # returning message too big,
123 self.assertEquals("Authentication", msg["type"])
124 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
125 self.assertEquals("Kerberos KDC",
126 msg["Authentication"]["serviceDescription"])
127 self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
129 # Check the second message it should be an Authentication
130 # This this the TCP Authentication in response to the message too big
131 # response to the UDP Authentication
133 self.assertEquals("Authentication", msg["type"])
134 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
135 self.assertEquals("Kerberos KDC",
136 msg["Authentication"]["serviceDescription"])
137 self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
139 # Check the third message it should be an Authorization
141 self.assertEquals("Authorization", msg["type"])
142 serviceDescription = "SMB"
143 print "binding %s" % binding
144 if binding == "[smb2]":
145 serviceDescription = "SMB2"
147 self.assertEquals(serviceDescription,
148 msg["Authorization"]["serviceDescription"])
149 self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
150 self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
153 def test_rpc_ncacn_np_ntlm_dns_sign(self):
154 creds = self.insta_creds(template=self.get_credentials(),
155 kerberos_state=DONT_USE_KERBEROS)
156 self._test_rpc_ncacn_np(["NTLMSSP",
160 creds, "dnsserver", "sign", "SIGN",
161 self.rpc_ncacn_np_ntlm_check)
163 def test_rpc_ncacn_np_ntlm_srv_sign(self):
164 creds = self.insta_creds(template=self.get_credentials(),
165 kerberos_state=DONT_USE_KERBEROS)
166 self._test_rpc_ncacn_np(["NTLMSSP",
170 creds, "srvsvc", "sign", "SIGN",
171 self.rpc_ncacn_np_ntlm_check)
173 def test_rpc_ncacn_np_ntlm_dns(self):
174 creds = self.insta_creds(template=self.get_credentials(),
175 kerberos_state=DONT_USE_KERBEROS)
176 self._test_rpc_ncacn_np(["ncacn_np",
179 creds, "dnsserver", "", "SMB",
180 self.rpc_ncacn_np_ntlm_check)
182 def test_rpc_ncacn_np_ntlm_srv(self):
183 creds = self.insta_creds(template=self.get_credentials(),
184 kerberos_state=DONT_USE_KERBEROS)
185 self._test_rpc_ncacn_np(["ncacn_np",
188 creds, "srvsvc", "", "SMB",
189 self.rpc_ncacn_np_ntlm_check)
191 def test_rpc_ncacn_np_krb_dns_sign(self):
192 creds = self.insta_creds(template=self.get_credentials(),
193 kerberos_state=MUST_USE_KERBEROS)
194 self._test_rpc_ncacn_np(["krb5",
195 "ENC-TS Pre-authentication",
196 "ENC-TS Pre-authentication",
198 creds, "dnsserver", "sign", "SIGN",
199 self.rpc_ncacn_np_krb5_check)
201 def test_rpc_ncacn_np_krb_srv_sign(self):
202 creds = self.insta_creds(template=self.get_credentials(),
203 kerberos_state=MUST_USE_KERBEROS)
204 self._test_rpc_ncacn_np(["krb5",
205 "ENC-TS Pre-authentication",
206 "ENC-TS Pre-authentication",
208 creds, "srvsvc", "sign", "SIGN",
209 self.rpc_ncacn_np_krb5_check)
211 def test_rpc_ncacn_np_krb_dns(self):
212 creds = self.insta_creds(template=self.get_credentials(),
213 kerberos_state=MUST_USE_KERBEROS)
214 self._test_rpc_ncacn_np(["ncacn_np",
215 "ENC-TS Pre-authentication",
216 "ENC-TS Pre-authentication",
218 creds, "dnsserver", "", "SMB",
219 self.rpc_ncacn_np_krb5_check)
221 def test_rpc_ncacn_np_krb_dns_smb2(self):
222 creds = self.insta_creds(template=self.get_credentials(),
223 kerberos_state=MUST_USE_KERBEROS)
224 self._test_rpc_ncacn_np(["ncacn_np",
225 "ENC-TS Pre-authentication",
226 "ENC-TS Pre-authentication",
228 creds, "dnsserver", "smb2", "SMB",
229 self.rpc_ncacn_np_krb5_check)
231 def test_rpc_ncacn_np_krb_srv(self):
232 creds = self.insta_creds(template=self.get_credentials(),
233 kerberos_state=MUST_USE_KERBEROS)
234 self._test_rpc_ncacn_np(["ncacn_np",
235 "ENC-TS Pre-authentication",
236 "ENC-TS Pre-authentication",
238 creds, "srvsvc", "", "SMB",
239 self.rpc_ncacn_np_krb5_check)
241 def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
242 binding, protection, checkFunction):
243 def isLastExpectedMessage(msg):
244 return (msg["type"] == "Authorization" and
245 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
246 msg["Authorization"]["authType"] == authTypes[0] and
247 msg["Authorization"]["transportProtection"] == protection)
250 binding = "[%s]" % binding
252 if service == "dnsserver":
253 conn = dnsserver.dnsserver("ncacn_ip_tcp:%s%s" % (self.server, binding),
256 elif service == "srvsvc":
257 conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
262 messages = self.waitForMessages(isLastExpectedMessage, conn)
263 checkFunction(messages, authTypes, service, binding, protection)
265 def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
266 binding, protection):
268 expected_messages = len(authTypes)
269 self.assertEquals(expected_messages,
271 "Did not receive the expected number of messages")
273 # Check the first message it should be an Authorization
275 self.assertEquals("Authorization", msg["type"])
276 self.assertEquals("DCE/RPC",
277 msg["Authorization"]["serviceDescription"])
278 self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
279 self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
281 # Check the second message it should be an Authentication
283 self.assertEquals("Authentication", msg["type"])
284 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
285 self.assertEquals("DCE/RPC",
286 msg["Authentication"]["serviceDescription"])
287 self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
289 def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
290 binding, protection):
292 expected_messages = len(authTypes)
293 self.assertEquals(expected_messages,
295 "Did not receive the expected number of messages")
297 # Check the first message it should be an Authorization
299 self.assertEquals("Authorization", msg["type"])
300 self.assertEquals("DCE/RPC",
301 msg["Authorization"]["serviceDescription"])
302 self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
303 self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
305 # Check the second message it should be an Authentication
307 self.assertEquals("Authentication", msg["type"])
308 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
309 self.assertEquals("Kerberos KDC",
310 msg["Authentication"]["serviceDescription"])
311 self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
313 # Check the third message it should be an Authentication
315 self.assertEquals("Authentication", msg["type"])
316 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
317 self.assertEquals("Kerberos KDC",
318 msg["Authentication"]["serviceDescription"])
319 self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
321 def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
322 creds = self.insta_creds(template=self.get_credentials(),
323 kerberos_state=DONT_USE_KERBEROS)
324 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
327 creds, "dnsserver", "sign", "SIGN",
328 self.rpc_ncacn_ip_tcp_ntlm_check)
330 def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
331 creds = self.insta_creds(template=self.get_credentials(),
332 kerberos_state=MUST_USE_KERBEROS)
333 self._test_rpc_ncacn_ip_tcp(["krb5",
335 "ENC-TS Pre-authentication",
336 "ENC-TS Pre-authentication"],
337 creds, "dnsserver", "sign", "SIGN",
338 self.rpc_ncacn_ip_tcp_krb5_check)
340 def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
341 creds = self.insta_creds(template=self.get_credentials(),
342 kerberos_state=DONT_USE_KERBEROS)
343 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
346 creds, "dnsserver", "", "SIGN",
347 self.rpc_ncacn_ip_tcp_ntlm_check)
349 def test_rpc_ncacn_ip_tcp_krb5_dns(self):
350 creds = self.insta_creds(template=self.get_credentials(),
351 kerberos_state=MUST_USE_KERBEROS)
352 self._test_rpc_ncacn_ip_tcp(["krb5",
354 "ENC-TS Pre-authentication",
355 "ENC-TS Pre-authentication"],
356 creds, "dnsserver", "", "SIGN",
357 self.rpc_ncacn_ip_tcp_krb5_check)
359 def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
360 creds = self.insta_creds(template=self.get_credentials(),
361 kerberos_state=DONT_USE_KERBEROS)
362 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
365 creds, "dnsserver", "connect", "NONE",
366 self.rpc_ncacn_ip_tcp_ntlm_check)
368 def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
369 creds = self.insta_creds(template=self.get_credentials(),
370 kerberos_state=MUST_USE_KERBEROS)
371 self._test_rpc_ncacn_ip_tcp(["krb5",
373 "ENC-TS Pre-authentication",
374 "ENC-TS Pre-authentication"],
375 creds, "dnsserver", "connect", "NONE",
376 self.rpc_ncacn_ip_tcp_krb5_check)
378 def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
379 creds = self.insta_creds(template=self.get_credentials(),
380 kerberos_state=DONT_USE_KERBEROS)
381 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
384 creds, "dnsserver", "seal", "SEAL",
385 self.rpc_ncacn_ip_tcp_ntlm_check)
387 def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
388 creds = self.insta_creds(template=self.get_credentials(),
389 kerberos_state=MUST_USE_KERBEROS)
390 self._test_rpc_ncacn_ip_tcp(["krb5",
392 "ENC-TS Pre-authentication",
393 "ENC-TS Pre-authentication"],
394 creds, "dnsserver", "seal", "SEAL",
395 self.rpc_ncacn_ip_tcp_krb5_check)
399 def isLastExpectedMessage(msg):
400 return (msg["type"] == "Authorization" and
401 msg["Authorization"]["serviceDescription"] == "LDAP" and
402 msg["Authorization"]["transportProtection"] == "SIGN" and
403 msg["Authorization"]["authType"] == "krb5")
405 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
406 lp = self.get_loadparm(),
407 credentials=self.get_credentials())
409 messages = self.waitForMessages(isLastExpectedMessage)
412 "Did not receive the expected number of messages")
414 # Check the first message it should be an Authentication
416 self.assertEquals("Authentication", msg["type"])
417 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
418 self.assertEquals("Kerberos KDC",
419 msg["Authentication"]["serviceDescription"])
420 self.assertEquals("ENC-TS Pre-authentication",
421 msg["Authentication"]["authDescription"])
423 # Check the first message it should be an Authentication
425 self.assertEquals("Authentication", msg["type"])
426 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
427 self.assertEquals("Kerberos KDC",
428 msg["Authentication"]["serviceDescription"])
429 self.assertEquals("ENC-TS Pre-authentication",
430 msg["Authentication"]["authDescription"])
432 def test_ldap_ntlm(self):
434 def isLastExpectedMessage(msg):
435 return (msg["type"] == "Authorization" and
436 msg["Authorization"]["serviceDescription"] == "LDAP" and
437 msg["Authorization"]["transportProtection"] == "SEAL" and
438 msg["Authorization"]["authType"] == "NTLMSSP")
440 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
441 lp = self.get_loadparm(),
442 credentials=self.get_credentials())
444 messages = self.waitForMessages(isLastExpectedMessage)
447 "Did not receive the expected number of messages")
448 # Check the first message it should be an Authentication
450 self.assertEquals("Authentication", msg["type"])
451 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
452 self.assertEquals("LDAP",
453 msg["Authentication"]["serviceDescription"])
454 self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
456 def test_ldap_simple_bind(self):
457 def isLastExpectedMessage(msg):
458 return (msg["type"] == "Authorization" and
459 msg["Authorization"]["serviceDescription"] == "LDAP" and
460 msg["Authorization"]["transportProtection"] == "TLS" and
461 msg["Authorization"]["authType"] == "simple bind")
463 creds = self.insta_creds(template=self.get_credentials())
464 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
465 creds.get_username()))
467 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
468 lp = self.get_loadparm(),
471 messages = self.waitForMessages(isLastExpectedMessage)
474 "Did not receive the expected number of messages")
476 # Check the first message it should be an Authentication
478 self.assertEquals("Authentication", msg["type"])
479 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
480 self.assertEquals("LDAP",
481 msg["Authentication"]["serviceDescription"])
482 self.assertEquals("simple bind",
483 msg["Authentication"]["authDescription"])
485 def test_ldap_simple_bind_bad_password(self):
486 def isLastExpectedMessage(msg):
487 return (msg["type"] == "Authentication" and
488 msg["Authentication"]["serviceDescription"] == "LDAP" and
489 msg["Authentication"]["status"]
490 == "NT_STATUS_WRONG_PASSWORD" and
491 msg["Authentication"]["authDescription"] == "simple bind")
493 creds = self.insta_creds(template=self.get_credentials())
494 creds.set_password("badPassword")
495 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
496 creds.get_username()))
500 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
501 lp = self.get_loadparm(),
505 self.assertEquals(thrown, True)
507 messages = self.waitForMessages(isLastExpectedMessage)
510 "Did not receive the expected number of messages")
513 def test_ldap_simple_bind_bad_user(self):
514 def isLastExpectedMessage(msg):
515 return (msg["type"] == "Authentication" and
516 msg["Authentication"]["serviceDescription"] == "LDAP" and
517 msg["Authentication"]["status"]
518 == "NT_STATUS_NO_SUCH_USER" and
519 msg["Authentication"]["authDescription"] == "simple bind")
521 creds = self.insta_creds(template=self.get_credentials())
522 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
526 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
527 lp = self.get_loadparm(),
531 self.assertEquals(thrown, True)
533 messages = self.waitForMessages(isLastExpectedMessage)
536 "Did not receive the expected number of messages")
539 def test_ldap_simple_bind_unparseable_user(self):
540 def isLastExpectedMessage(msg):
541 return (msg["type"] == "Authentication" and
542 msg["Authentication"]["serviceDescription"] == "LDAP" and
543 msg["Authentication"]["status"]
544 == "NT_STATUS_NO_SUCH_USER" and
545 msg["Authentication"]["authDescription"] == "simple bind")
547 creds = self.insta_creds(template=self.get_credentials())
548 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
552 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
553 lp = self.get_loadparm(),
557 self.assertEquals(thrown, True)
559 messages = self.waitForMessages(isLastExpectedMessage)
562 "Did not receive the expected number of messages")
565 # Note: as this test does not expect any messages it will
566 # time out in the call to self.waitForMessages.
567 # This is expected, but it will slow this test.
568 def test_ldap_anonymous_access_bind_only(self):
569 # Should be no logging for anonymous bind
570 # so receiving any message indicates a failure.
571 def isLastExpectedMessage( msg):
574 creds = self.insta_creds(template=self.get_credentials())
575 creds.set_anonymous()
577 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
578 lp = self.get_loadparm(),
581 messages = self.waitForMessages( isLastExpectedMessage)
584 "Did not receive the expected number of messages")
586 def test_ldap_anonymous_access(self):
587 def isLastExpectedMessage( msg):
588 return (msg["type"] == "Authorization" and
589 msg["Authorization"]["serviceDescription"] == "LDAP" and
590 msg["Authorization"]["transportProtection"] == "TLS" and
591 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
592 msg["Authorization"]["authType"] == "no bind")
594 creds = self.insta_creds(template=self.get_credentials())
595 creds.set_anonymous()
597 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
598 lp = self.get_loadparm(),
602 res = self.samdb.search(base=self.samdb.domain_dn())
603 self.fail( "Expected an LdbError exception")
607 messages = self.waitForMessages( isLastExpectedMessage)
610 "Did not receive the expected number of messages")
612 def isLastExpectedMessage(msg):
613 return (msg["type"] == "Authorization" and
614 msg["Authorization"]["serviceDescription"] == "SMB" and
615 msg["Authorization"]["authType"] == "krb5" and
616 msg["Authorization"]["transportProtection"] == "SMB")
618 creds = self.insta_creds(template=self.get_credentials())
621 lp=self.get_loadparm(),
624 messages = self.waitForMessages(isLastExpectedMessage)
627 "Did not receive the expected number of messages")
628 # Check the first message it should be an Authentication
630 self.assertEquals("Authentication", msg["type"])
631 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
632 self.assertEquals("Kerberos KDC",
633 msg["Authentication"]["serviceDescription"])
634 self.assertEquals("ENC-TS Pre-authentication",
635 msg["Authentication"]["authDescription"])
637 # Check the second message it should be an Authentication
639 self.assertEquals("Authentication", msg["type"])
640 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
641 self.assertEquals("Kerberos KDC",
642 msg["Authentication"]["serviceDescription"])
643 self.assertEquals("ENC-TS Pre-authentication",
644 msg["Authentication"]["authDescription"])
646 def test_smb_bad_password(self):
647 def isLastExpectedMessage(msg):
648 return (msg["type"] == "Authentication" and
649 msg["Authentication"]["serviceDescription"]
650 == "Kerberos KDC" and
651 msg["Authentication"]["status"]
652 == "NT_STATUS_WRONG_PASSWORD" and
653 msg["Authentication"]["authDescription"]
654 == "ENC-TS Pre-authentication")
656 creds = self.insta_creds(template=self.get_credentials())
657 creds.set_password("badPassword")
663 lp=self.get_loadparm(),
665 except NTSTATUSError:
667 self.assertEquals(thrown, True)
669 messages = self.waitForMessages(isLastExpectedMessage)
672 "Did not receive the expected number of messages")
675 def test_smb_bad_user(self):
676 def isLastExpectedMessage(msg):
677 return (msg["type"] == "Authentication" and
678 msg["Authentication"]["serviceDescription"]
679 == "Kerberos KDC" and
680 msg["Authentication"]["status"]
681 == "NT_STATUS_NO_SUCH_USER" and
682 msg["Authentication"]["authDescription"]
683 == "ENC-TS Pre-authentication")
685 creds = self.insta_creds(template=self.get_credentials())
686 creds.set_username("badUser")
692 lp=self.get_loadparm(),
694 except NTSTATUSError:
696 self.assertEquals(thrown, True)
698 messages = self.waitForMessages(isLastExpectedMessage)
701 "Did not receive the expected number of messages")
703 def test_smb1_anonymous(self):
704 def isLastExpectedMessage(msg):
705 return (msg["type"] == "Authorization" and
706 msg["Authorization"]["serviceDescription"] == "SMB" and
707 msg["Authorization"]["authType"] == "NTLMSSP" and
708 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
709 msg["Authorization"]["transportProtection"] == "SMB")
711 server = os.environ["SERVER"]
713 path = "//%s/IPC$" % server
715 call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
717 messages = self.waitForMessages(isLastExpectedMessage)
720 "Did not receive the expected number of messages")
722 # Check the first message it should be an Authentication
724 self.assertEquals("Authentication", msg["type"])
725 self.assertEquals("NT_STATUS_NO_SUCH_USER",
726 msg["Authentication"]["status"])
727 self.assertEquals("SMB",
728 msg["Authentication"]["serviceDescription"])
729 self.assertEquals("NTLMSSP",
730 msg["Authentication"]["authDescription"])
731 self.assertEquals("No-Password",
732 msg["Authentication"]["passwordType"])
734 # Check the second message it should be an Authentication
736 self.assertEquals("Authentication", msg["type"])
737 self.assertEquals("NT_STATUS_OK",
738 msg["Authentication"]["status"])
739 self.assertEquals("SMB",
740 msg["Authentication"]["serviceDescription"])
741 self.assertEquals("NTLMSSP",
742 msg["Authentication"]["authDescription"])
743 self.assertEquals("No-Password",
744 msg["Authentication"]["passwordType"])
745 self.assertEquals("ANONYMOUS LOGON",
746 msg["Authentication"]["becameAccount"])
748 def test_smb2_anonymous(self):
749 def isLastExpectedMessage(msg):
750 return (msg["type"] == "Authorization" and
751 msg["Authorization"]["serviceDescription"] == "SMB2" and
752 msg["Authorization"]["authType"] == "NTLMSSP" and
753 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
754 msg["Authorization"]["transportProtection"] == "SMB")
756 server = os.environ["SERVER"]
758 path = "//%s/IPC$" % server
760 call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
762 messages = self.waitForMessages(isLastExpectedMessage)
765 "Did not receive the expected number of messages")
767 # Check the first message it should be an Authentication
769 self.assertEquals("Authentication", msg["type"])
770 self.assertEquals("NT_STATUS_NO_SUCH_USER",
771 msg["Authentication"]["status"])
772 self.assertEquals("SMB2",
773 msg["Authentication"]["serviceDescription"])
774 self.assertEquals("NTLMSSP",
775 msg["Authentication"]["authDescription"])
776 self.assertEquals("No-Password",
777 msg["Authentication"]["passwordType"])
779 # Check the second message it should be an Authentication
781 self.assertEquals("Authentication", msg["type"])
782 self.assertEquals("NT_STATUS_OK",
783 msg["Authentication"]["status"])
784 self.assertEquals("SMB2",
785 msg["Authentication"]["serviceDescription"])
786 self.assertEquals("NTLMSSP",
787 msg["Authentication"]["authDescription"])
788 self.assertEquals("No-Password",
789 msg["Authentication"]["passwordType"])
790 self.assertEquals("ANONYMOUS LOGON",
791 msg["Authentication"]["becameAccount"])
793 def test_smb_no_krb_spnego(self):
794 def isLastExpectedMessage(msg):
795 return (msg["type"] == "Authorization" and
796 msg["Authorization"]["serviceDescription"] == "SMB" and
797 msg["Authorization"]["authType"] == "NTLMSSP" and
798 msg["Authorization"]["transportProtection"] == "SMB")
800 creds = self.insta_creds(template=self.get_credentials(),
801 kerberos_state=DONT_USE_KERBEROS)
804 lp=self.get_loadparm(),
807 messages = self.waitForMessages(isLastExpectedMessage)
810 "Did not receive the expected number of messages")
811 # Check the first message it should be an Authentication
813 self.assertEquals("Authentication", msg["type"])
814 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
815 self.assertEquals("SMB",
816 msg["Authentication"]["serviceDescription"])
817 self.assertEquals("NTLMSSP",
818 msg["Authentication"]["authDescription"])
819 self.assertEquals("NTLMv2",
820 msg["Authentication"]["passwordType"])
822 def test_smb_no_krb_spnego_bad_password(self):
823 def isLastExpectedMessage(msg):
824 return (msg["type"] == "Authentication" and
825 msg["Authentication"]["serviceDescription"] == "SMB" and
826 msg["Authentication"]["authDescription"] == "NTLMSSP" and
827 msg["Authentication"]["passwordType"] == "NTLMv2" and
828 msg["Authentication"]["status"]
829 == "NT_STATUS_WRONG_PASSWORD")
831 creds = self.insta_creds(template=self.get_credentials(),
832 kerberos_state=DONT_USE_KERBEROS)
833 creds.set_password("badPassword")
839 lp=self.get_loadparm(),
841 except NTSTATUSError:
843 self.assertEquals(thrown, True)
845 messages = self.waitForMessages(isLastExpectedMessage)
848 "Did not receive the expected number of messages")
850 def test_smb_no_krb_spnego_bad_user(self):
851 def isLastExpectedMessage(msg):
852 return (msg["type"] == "Authentication" and
853 msg["Authentication"]["serviceDescription"] == "SMB" and
854 msg["Authentication"]["authDescription"] == "NTLMSSP" and
855 msg["Authentication"]["passwordType"] == "NTLMv2" and
856 msg["Authentication"]["status"]
857 == "NT_STATUS_NO_SUCH_USER")
859 creds = self.insta_creds(template=self.get_credentials(),
860 kerberos_state=DONT_USE_KERBEROS)
861 creds.set_username("badUser")
867 lp=self.get_loadparm(),
869 except NTSTATUSError:
871 self.assertEquals(thrown, True)
873 messages = self.waitForMessages(isLastExpectedMessage)
876 "Did not receive the expected number of messages")
878 def test_smb_no_krb_no_spnego_no_ntlmv2(self):
879 def isLastExpectedMessage(msg):
880 return (msg["type"] == "Authorization" and
881 msg["Authorization"]["serviceDescription"] == "SMB" and
882 msg["Authorization"]["authType"] == "bare-NTLM" and
883 msg["Authorization"]["transportProtection"] == "SMB")
885 creds = self.insta_creds(template=self.get_credentials(),
886 kerberos_state=DONT_USE_KERBEROS)
889 lp=self.get_loadparm(),
894 messages = self.waitForMessages(isLastExpectedMessage)
897 "Did not receive the expected number of messages")
898 # Check the first message it should be an Authentication
900 self.assertEquals("Authentication", msg["type"])
901 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
902 self.assertEquals("SMB",
903 msg["Authentication"]["serviceDescription"])
904 self.assertEquals("bare-NTLM",
905 msg["Authentication"]["authDescription"])
906 self.assertEquals("NTLMv1",
907 msg["Authentication"]["passwordType"])
909 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
910 def isLastExpectedMessage(msg):
911 return (msg["type"] == "Authentication" and
912 msg["Authentication"]["serviceDescription"] == "SMB" and
913 msg["Authentication"]["authDescription"] == "bare-NTLM" and
914 msg["Authentication"]["passwordType"] == "NTLMv1" and
915 msg["Authentication"]["status"]
916 == "NT_STATUS_WRONG_PASSWORD")
918 creds = self.insta_creds(template=self.get_credentials(),
919 kerberos_state=DONT_USE_KERBEROS)
920 creds.set_password("badPassword")
926 lp=self.get_loadparm(),
930 except NTSTATUSError:
932 self.assertEquals(thrown, True)
935 messages = self.waitForMessages(isLastExpectedMessage)
938 "Did not receive the expected number of messages")
940 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
941 def isLastExpectedMessage(msg):
942 return (msg["type"] == "Authentication" and
943 msg["Authentication"]["serviceDescription"] == "SMB" and
944 msg["Authentication"]["authDescription"] == "bare-NTLM" and
945 msg["Authentication"]["passwordType"] == "NTLMv1" and
946 msg["Authentication"]["status"]
947 == "NT_STATUS_NO_SUCH_USER")
949 creds = self.insta_creds(template=self.get_credentials(),
950 kerberos_state=DONT_USE_KERBEROS)
951 creds.set_username("badUser")
957 lp=self.get_loadparm(),
961 except NTSTATUSError:
963 self.assertEquals(thrown, True)
966 messages = self.waitForMessages(isLastExpectedMessage)
969 "Did not receive the expected number of messages")
971 def test_samlogon_interactive(self):
973 workstation = "AuthLogTests"
975 def isLastExpectedMessage( msg):
976 return (msg["type"] == "Authentication" and
977 msg["Authentication"]["serviceDescription"]
979 msg["Authentication"]["authDescription"]
981 msg["Authentication"]["status"] == "NT_STATUS_OK" and
982 msg["Authentication"]["workstation"]
983 == r"\\%s" % workstation)
985 server = os.environ["SERVER"]
986 user = os.environ["USERNAME"]
987 password = os.environ["PASSWORD"]
988 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
991 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
993 messages = self.waitForMessages( isLastExpectedMessage)
994 messages = self.remove_netlogon_messages(messages)
995 received = len(messages)
997 (received == 5 or received == 6),
998 "Did not receive the expected number of messages")
1000 def test_samlogon_interactive_bad_password(self):
1002 workstation = "AuthLogTests"
1004 def isLastExpectedMessage( msg):
1005 return (msg["type"] == "Authentication" and
1006 msg["Authentication"]["serviceDescription"]
1008 msg["Authentication"]["authDescription"]
1009 == "interactive" and
1010 msg["Authentication"]["status"]
1011 == "NT_STATUS_WRONG_PASSWORD" and
1012 msg["Authentication"]["workstation"]
1013 == r"\\%s" % workstation)
1015 server = os.environ["SERVER"]
1016 user = os.environ["USERNAME"]
1017 password = "badPassword"
1018 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1021 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1023 messages = self.waitForMessages( isLastExpectedMessage)
1024 messages = self.remove_netlogon_messages(messages)
1025 received = len(messages)
1027 (received == 5 or received == 6),
1028 "Did not receive the expected number of messages")
1030 def test_samlogon_interactive_bad_user(self):
1032 workstation = "AuthLogTests"
1034 def isLastExpectedMessage( msg):
1035 return (msg["type"] == "Authentication" and
1036 msg["Authentication"]["serviceDescription"]
1038 msg["Authentication"]["authDescription"]
1039 == "interactive" and
1040 msg["Authentication"]["status"]
1041 == "NT_STATUS_NO_SUCH_USER" and
1042 msg["Authentication"]["workstation"]
1043 == r"\\%s" % workstation)
1045 server = os.environ["SERVER"]
1047 password = os.environ["PASSWORD"]
1048 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1051 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1053 messages = self.waitForMessages( isLastExpectedMessage)
1054 messages = self.remove_netlogon_messages(messages)
1055 received = len(messages)
1057 (received == 5 or received == 6),
1058 "Did not receive the expected number of messages")
1060 def test_samlogon_network(self):
1062 workstation = "AuthLogTests"
1064 def isLastExpectedMessage( msg):
1065 return (msg["type"] == "Authentication" and
1066 msg["Authentication"]["serviceDescription"]
1068 msg["Authentication"]["authDescription"]
1070 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1071 msg["Authentication"]["workstation"]
1072 == r"\\%s" % workstation)
1074 server = os.environ["SERVER"]
1075 user = os.environ["USERNAME"]
1076 password = os.environ["PASSWORD"]
1077 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1080 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1082 messages = self.waitForMessages( isLastExpectedMessage)
1083 messages = self.remove_netlogon_messages(messages)
1084 received = len(messages)
1086 (received == 5 or received == 6),
1087 "Did not receive the expected number of messages")
1089 def test_samlogon_network_bad_password(self):
1091 workstation = "AuthLogTests"
1093 def isLastExpectedMessage( msg):
1094 return (msg["type"] == "Authentication" and
1095 msg["Authentication"]["serviceDescription"]
1097 msg["Authentication"]["authDescription"]
1099 msg["Authentication"]["status"]
1100 == "NT_STATUS_WRONG_PASSWORD" and
1101 msg["Authentication"]["workstation"]
1102 == r"\\%s" % workstation)
1104 server = os.environ["SERVER"]
1105 user = os.environ["USERNAME"]
1106 password = "badPassword"
1107 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1110 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1112 messages = self.waitForMessages( isLastExpectedMessage)
1113 messages = self.remove_netlogon_messages(messages)
1114 received = len(messages)
1116 (received == 5 or received == 6),
1117 "Did not receive the expected number of messages")
1119 def test_samlogon_network_bad_user(self):
1121 workstation = "AuthLogTests"
1123 def isLastExpectedMessage( msg):
1124 return (msg["type"] == "Authentication" and
1125 msg["Authentication"]["serviceDescription"]
1127 msg["Authentication"]["authDescription"]
1129 msg["Authentication"]["status"]
1130 == "NT_STATUS_NO_SUCH_USER" and
1131 msg["Authentication"]["workstation"]
1132 == r"\\%s" % workstation)
1134 server = os.environ["SERVER"]
1136 password = os.environ["PASSWORD"]
1137 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1140 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1142 messages = self.waitForMessages( isLastExpectedMessage)
1143 messages = self.remove_netlogon_messages(messages)
1144 received = len(messages)
1146 (received == 5 or received == 6),
1147 "Did not receive the expected number of messages")
1149 def test_samlogon_network_mschap(self):
1151 workstation = "AuthLogTests"
1153 def isLastExpectedMessage( msg):
1154 return (msg["type"] == "Authentication" and
1155 msg["Authentication"]["serviceDescription"]
1157 msg["Authentication"]["authDescription"]
1159 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1160 msg["Authentication"]["passwordType"] == "MSCHAPv2" and
1161 msg["Authentication"]["workstation"]
1162 == r"\\%s" % workstation)
1164 server = os.environ["SERVER"]
1165 user = os.environ["USERNAME"]
1166 password = os.environ["PASSWORD"]
1167 samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2)
1170 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1172 messages = self.waitForMessages( isLastExpectedMessage)
1173 messages = self.remove_netlogon_messages(messages)
1174 received = len(messages)
1176 (received == 5 or received == 6),
1177 "Did not receive the expected number of messages")
1179 def test_samlogon_network_mschap_bad_password(self):
1181 workstation = "AuthLogTests"
1183 def isLastExpectedMessage( msg):
1184 return (msg["type"] == "Authentication" and
1185 msg["Authentication"]["serviceDescription"]
1187 msg["Authentication"]["authDescription"]
1189 msg["Authentication"]["status"]
1190 == "NT_STATUS_WRONG_PASSWORD" and
1191 msg["Authentication"]["passwordType"] == "MSCHAPv2" and
1192 msg["Authentication"]["workstation"]
1193 == r"\\%s" % workstation)
1195 server = os.environ["SERVER"]
1196 user = os.environ["USERNAME"]
1197 password = "badPassword"
1198 samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2)
1201 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1203 messages = self.waitForMessages( isLastExpectedMessage)
1204 messages = self.remove_netlogon_messages(messages)
1205 received = len(messages)
1207 (received == 5 or received == 6),
1208 "Did not receive the expected number of messages")
1210 def test_samlogon_network_mschap_bad_user(self):
1212 workstation = "AuthLogTests"
1214 def isLastExpectedMessage( msg):
1215 return (msg["type"] == "Authentication" and
1216 msg["Authentication"]["serviceDescription"]
1218 msg["Authentication"]["authDescription"]
1220 msg["Authentication"]["status"]
1221 == "NT_STATUS_NO_SUCH_USER" and
1222 msg["Authentication"]["passwordType"] == "MSCHAPv2" and
1223 msg["Authentication"]["workstation"]
1224 == r"\\%s" % workstation)
1226 server = os.environ["SERVER"]
1228 password = os.environ["PASSWORD"]
1229 samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2)
1232 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1234 messages = self.waitForMessages( isLastExpectedMessage)
1235 messages = self.remove_netlogon_messages(messages)
1236 received = len(messages)
1238 (received == 5 or received == 6),
1239 "Did not receive the expected number of messages")
1241 def test_samlogon_schannel_seal(self):
1243 workstation = "AuthLogTests"
1245 def isLastExpectedMessage( msg):
1246 return (msg["type"] == "Authentication" and
1247 msg["Authentication"]["serviceDescription"]
1249 msg["Authentication"]["authDescription"]
1251 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1252 msg["Authentication"]["workstation"]
1253 == r"\\%s" % workstation)
1255 server = os.environ["SERVER"]
1256 user = os.environ["USERNAME"]
1257 password = os.environ["PASSWORD"]
1258 samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1261 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1263 messages = self.waitForMessages( isLastExpectedMessage)
1264 messages = self.remove_netlogon_messages(messages)
1265 received = len(messages)
1267 (received == 5 or received == 6),
1268 "Did not receive the expected number of messages")
1270 # Check the second to last message it should be an Authorization
1272 self.assertEquals("Authorization", msg["type"])
1273 self.assertEquals("DCE/RPC",
1274 msg["Authorization"]["serviceDescription"])
1275 self.assertEquals("schannel", msg["Authorization"]["authType"])
1276 self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1278 # Signed logons get promoted to sealed, this test ensures that
1279 # this behaviour is not removed accidently
1280 def test_samlogon_schannel_sign(self):
1282 workstation = "AuthLogTests"
1284 def isLastExpectedMessage( msg):
1285 return (msg["type"] == "Authentication" and
1286 msg["Authentication"]["serviceDescription"]
1288 msg["Authentication"]["authDescription"]
1290 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1291 msg["Authentication"]["workstation"]
1292 == r"\\%s" % workstation)
1294 server = os.environ["SERVER"]
1295 user = os.environ["USERNAME"]
1296 password = os.environ["PASSWORD"]
1297 samlogon = "schannelsign;samlogon %s %s %s" % (user, password, workstation)
1300 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1302 messages = self.waitForMessages( isLastExpectedMessage)
1303 messages = self.remove_netlogon_messages(messages)
1304 received = len(messages)
1306 (received == 5 or received == 6),
1307 "Did not receive the expected number of messages")
1309 # Check the second to last message it should be an Authorization
1311 self.assertEquals("Authorization", msg["type"])
1312 self.assertEquals("DCE/RPC",
1313 msg["Authorization"]["serviceDescription"])
1314 self.assertEquals("schannel", msg["Authorization"]["authType"])
1315 self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])