python: tests: Make tests of dsdb Python module Python 3 compatible
[samba.git] / python / samba / tests / auth_log.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for the Auth and AuthZ logging.
19 """
20
21 from samba import auth
22 import samba.tests
23 from samba.messaging import Messaging
24 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
25 from samba.dcerpc import srvsvc, dnsserver
26 import time
27 import json
28 import os
29 from samba import smb
30 from samba.samdb import SamDB
31 import samba.tests.auth_log_base
32 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
33 from samba import NTSTATUSError
34 from subprocess import call
35 from ldb import LdbError
36
37 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
38
39     def setUp(self):
40         super(AuthLogTests, self).setUp()
41         self.remoteAddress = os.environ["CLIENT_IP"]
42
43     def tearDown(self):
44         super(AuthLogTests, self).tearDown()
45
46
47
48     def _test_rpc_ncacn_np(self, authTypes, creds, service,
49                            binding, protection, checkFunction):
50         def isLastExpectedMessage(msg):
51             return (msg["type"] == "Authorization" and
52                     (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
53                      msg["Authorization"]["serviceDescription"] == service) and
54                     msg["Authorization"]["authType"] == authTypes[0] and
55                     msg["Authorization"]["transportProtection"] == protection)
56
57         if binding:
58             binding = "[%s]" % binding
59
60         if service == "dnsserver":
61             x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
62                                 self.get_loadparm(),
63                                 creds)
64         elif service == "srvsvc":
65             x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
66                               self.get_loadparm(),
67                               creds)
68
69         # The connection is passed to ensure the server
70         # messaging context stays up until all the messages have been received.
71         messages = self.waitForMessages(isLastExpectedMessage, x)
72         checkFunction(messages, authTypes, service, binding, protection)
73
74     def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
75                                 binding, protection):
76
77         expected_messages = len(authTypes)
78         self.assertEquals(expected_messages,
79                           len(messages),
80                           "Did not receive the expected number of messages")
81
82         # Check the first message it should be an Authentication
83         msg = messages[0]
84         self.assertEquals("Authentication", msg["type"])
85         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
86         self.assertEquals("SMB",
87                            msg["Authentication"]["serviceDescription"])
88         self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
89
90         # Check the second message it should be an Authorization
91         msg = messages[1]
92         self.assertEquals("Authorization", msg["type"])
93         self.assertEquals("SMB",
94                           msg["Authorization"]["serviceDescription"])
95         self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
96         self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
97
98         # Check the third message it should be an Authentication
99         # if we are expecting 4 messages
100         if expected_messages == 4:
101             def checkServiceDescription(desc):
102                 return (desc == "DCE/RPC" or desc == service)
103
104             msg = messages[2]
105             self.assertEquals("Authentication", msg["type"])
106             self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
107             self.assertTrue(
108                 checkServiceDescription(msg["Authentication"]["serviceDescription"]))
109
110             self.assertEquals(authTypes[3], msg["Authentication"]["authDescription"])
111
112     def rpc_ncacn_np_krb5_check(self, messages, authTypes, service, binding, protection):
113
114         expected_messages = len(authTypes)
115         self.assertEquals(expected_messages,
116                           len(messages),
117                           "Did not receive the expected number of messages")
118
119         # Check the first message it should be an Authentication
120         # This is almost certainly Authentication over UDP, and is probably
121         # returning message too big,
122         msg = messages[0]
123         self.assertEquals("Authentication", msg["type"])
124         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
125         self.assertEquals("Kerberos KDC",
126                            msg["Authentication"]["serviceDescription"])
127         self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
128
129         # Check the second message it should be an Authentication
130         # This this the TCP Authentication in response to the message too big
131         # response to the UDP Authentication
132         msg = messages[1]
133         self.assertEquals("Authentication", msg["type"])
134         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
135         self.assertEquals("Kerberos KDC",
136                            msg["Authentication"]["serviceDescription"])
137         self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
138
139         # Check the third message it should be an Authorization
140         msg = messages[2]
141         self.assertEquals("Authorization", msg["type"])
142         serviceDescription = "SMB"
143         print "binding %s" % binding
144         if binding == "[smb2]":
145             serviceDescription = "SMB2"
146
147         self.assertEquals(serviceDescription,
148                           msg["Authorization"]["serviceDescription"])
149         self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
150         self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
151
152
153     def test_rpc_ncacn_np_ntlm_dns_sign(self):
154         creds = self.insta_creds(template=self.get_credentials(),
155                                  kerberos_state=DONT_USE_KERBEROS)
156         self._test_rpc_ncacn_np(["NTLMSSP",
157                                  "NTLMSSP",
158                                  "NTLMSSP",
159                                  "NTLMSSP"],
160                                 creds, "dnsserver", "sign", "SIGN",
161                                 self.rpc_ncacn_np_ntlm_check)
162
163     def test_rpc_ncacn_np_ntlm_srv_sign(self):
164         creds = self.insta_creds(template=self.get_credentials(),
165                                  kerberos_state=DONT_USE_KERBEROS)
166         self._test_rpc_ncacn_np(["NTLMSSP",
167                                  "NTLMSSP",
168                                  "NTLMSSP",
169                                  "NTLMSSP"],
170                                 creds, "srvsvc", "sign", "SIGN",
171                                 self.rpc_ncacn_np_ntlm_check)
172
173     def test_rpc_ncacn_np_ntlm_dns(self):
174         creds = self.insta_creds(template=self.get_credentials(),
175                                  kerberos_state=DONT_USE_KERBEROS)
176         self._test_rpc_ncacn_np(["ncacn_np",
177                                  "NTLMSSP",
178                                  "NTLMSSP"],
179                                 creds, "dnsserver", "", "SMB",
180                                 self.rpc_ncacn_np_ntlm_check)
181
182     def test_rpc_ncacn_np_ntlm_srv(self):
183         creds = self.insta_creds(template=self.get_credentials(),
184                                  kerberos_state=DONT_USE_KERBEROS)
185         self._test_rpc_ncacn_np(["ncacn_np",
186                                  "NTLMSSP",
187                                  "NTLMSSP"],
188                                 creds, "srvsvc", "", "SMB",
189                                 self.rpc_ncacn_np_ntlm_check)
190
191     def test_rpc_ncacn_np_krb_dns_sign(self):
192         creds = self.insta_creds(template=self.get_credentials(),
193                                  kerberos_state=MUST_USE_KERBEROS)
194         self._test_rpc_ncacn_np(["krb5",
195                                  "ENC-TS Pre-authentication",
196                                  "ENC-TS Pre-authentication",
197                                  "krb5"],
198                                  creds, "dnsserver", "sign", "SIGN",
199                                  self.rpc_ncacn_np_krb5_check)
200
201     def test_rpc_ncacn_np_krb_srv_sign(self):
202         creds = self.insta_creds(template=self.get_credentials(),
203                                  kerberos_state=MUST_USE_KERBEROS)
204         self._test_rpc_ncacn_np(["krb5",
205                                  "ENC-TS Pre-authentication",
206                                  "ENC-TS Pre-authentication",
207                                  "krb5"],
208                                  creds, "srvsvc", "sign", "SIGN",
209                                  self.rpc_ncacn_np_krb5_check)
210
211     def test_rpc_ncacn_np_krb_dns(self):
212         creds = self.insta_creds(template=self.get_credentials(),
213                                  kerberos_state=MUST_USE_KERBEROS)
214         self._test_rpc_ncacn_np(["ncacn_np",
215                                  "ENC-TS Pre-authentication",
216                                  "ENC-TS Pre-authentication",
217                                  "krb5"],
218                                 creds, "dnsserver", "", "SMB",
219                                 self.rpc_ncacn_np_krb5_check)
220
221     def test_rpc_ncacn_np_krb_dns_smb2(self):
222         creds = self.insta_creds(template=self.get_credentials(),
223                                  kerberos_state=MUST_USE_KERBEROS)
224         self._test_rpc_ncacn_np(["ncacn_np",
225                                  "ENC-TS Pre-authentication",
226                                  "ENC-TS Pre-authentication",
227                                  "krb5"],
228                                 creds, "dnsserver", "smb2", "SMB",
229                                 self.rpc_ncacn_np_krb5_check)
230
231     def test_rpc_ncacn_np_krb_srv(self):
232         creds = self.insta_creds(template=self.get_credentials(),
233                                  kerberos_state=MUST_USE_KERBEROS)
234         self._test_rpc_ncacn_np(["ncacn_np",
235                                 "ENC-TS Pre-authentication",
236                                 "ENC-TS Pre-authentication",
237                                 "krb5"],
238                                 creds, "srvsvc", "", "SMB",
239                                 self.rpc_ncacn_np_krb5_check)
240
241     def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
242                                binding, protection, checkFunction):
243         def isLastExpectedMessage(msg):
244             return (msg["type"] == "Authorization" and
245                     msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
246                     msg["Authorization"]["authType"] == authTypes[0] and
247                     msg["Authorization"]["transportProtection"] == protection)
248
249         if binding:
250             binding = "[%s]" % binding
251
252         if service == "dnsserver":
253             conn = dnsserver.dnsserver("ncacn_ip_tcp:%s%s" % (self.server, binding),
254                                        self.get_loadparm(),
255                                        creds)
256         elif service == "srvsvc":
257             conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
258                                  self.get_loadparm(),
259                                  creds)
260
261
262         messages = self.waitForMessages(isLastExpectedMessage, conn)
263         checkFunction(messages, authTypes, service, binding, protection)
264
265     def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
266                                     binding, protection):
267
268         expected_messages = len(authTypes)
269         self.assertEquals(expected_messages,
270                           len(messages),
271                           "Did not receive the expected number of messages")
272
273         # Check the first message it should be an Authorization
274         msg = messages[0]
275         self.assertEquals("Authorization", msg["type"])
276         self.assertEquals("DCE/RPC",
277                           msg["Authorization"]["serviceDescription"])
278         self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
279         self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
280
281         # Check the second message it should be an Authentication
282         msg = messages[1]
283         self.assertEquals("Authentication", msg["type"])
284         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
285         self.assertEquals("DCE/RPC",
286                            msg["Authentication"]["serviceDescription"])
287         self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
288
289     def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
290                                     binding, protection):
291
292         expected_messages = len(authTypes)
293         self.assertEquals(expected_messages,
294                           len(messages),
295                           "Did not receive the expected number of messages")
296
297         # Check the first message it should be an Authorization
298         msg = messages[0]
299         self.assertEquals("Authorization", msg["type"])
300         self.assertEquals("DCE/RPC",
301                           msg["Authorization"]["serviceDescription"])
302         self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
303         self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
304
305         # Check the second message it should be an Authentication
306         msg = messages[1]
307         self.assertEquals("Authentication", msg["type"])
308         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
309         self.assertEquals("Kerberos KDC",
310                            msg["Authentication"]["serviceDescription"])
311         self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
312
313         # Check the third message it should be an Authentication
314         msg = messages[2]
315         self.assertEquals("Authentication", msg["type"])
316         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
317         self.assertEquals("Kerberos KDC",
318                            msg["Authentication"]["serviceDescription"])
319         self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
320
321     def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
322         creds = self.insta_creds(template=self.get_credentials(),
323                                  kerberos_state=DONT_USE_KERBEROS)
324         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
325                                      "ncacn_ip_tcp",
326                                      "NTLMSSP"],
327                                      creds, "dnsserver", "sign", "SIGN",
328                                      self.rpc_ncacn_ip_tcp_ntlm_check)
329
330     def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
331         creds = self.insta_creds(template=self.get_credentials(),
332                                  kerberos_state=MUST_USE_KERBEROS)
333         self._test_rpc_ncacn_ip_tcp(["krb5",
334                                      "ncacn_ip_tcp",
335                                      "ENC-TS Pre-authentication",
336                                      "ENC-TS Pre-authentication"],
337                                      creds, "dnsserver", "sign", "SIGN",
338                                      self.rpc_ncacn_ip_tcp_krb5_check)
339
340     def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
341         creds = self.insta_creds(template=self.get_credentials(),
342                                  kerberos_state=DONT_USE_KERBEROS)
343         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
344                                      "ncacn_ip_tcp",
345                                      "NTLMSSP"],
346                                      creds, "dnsserver", "", "SIGN",
347                                      self.rpc_ncacn_ip_tcp_ntlm_check)
348
349     def test_rpc_ncacn_ip_tcp_krb5_dns(self):
350         creds = self.insta_creds(template=self.get_credentials(),
351                                  kerberos_state=MUST_USE_KERBEROS)
352         self._test_rpc_ncacn_ip_tcp(["krb5",
353                                      "ncacn_ip_tcp",
354                                      "ENC-TS Pre-authentication",
355                                      "ENC-TS Pre-authentication"],
356                                      creds, "dnsserver", "", "SIGN",
357                                      self.rpc_ncacn_ip_tcp_krb5_check)
358
359     def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
360         creds = self.insta_creds(template=self.get_credentials(),
361                                  kerberos_state=DONT_USE_KERBEROS)
362         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
363                                      "ncacn_ip_tcp",
364                                      "NTLMSSP"],
365                                      creds, "dnsserver", "connect", "NONE",
366                                      self.rpc_ncacn_ip_tcp_ntlm_check)
367
368     def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
369         creds = self.insta_creds(template=self.get_credentials(),
370                                  kerberos_state=MUST_USE_KERBEROS)
371         self._test_rpc_ncacn_ip_tcp(["krb5",
372                                      "ncacn_ip_tcp",
373                                      "ENC-TS Pre-authentication",
374                                      "ENC-TS Pre-authentication"],
375                                      creds, "dnsserver", "connect", "NONE",
376                                      self.rpc_ncacn_ip_tcp_krb5_check)
377
378     def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
379         creds = self.insta_creds(template=self.get_credentials(),
380                                  kerberos_state=DONT_USE_KERBEROS)
381         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
382                                      "ncacn_ip_tcp",
383                                      "NTLMSSP"],
384                                      creds, "dnsserver", "seal", "SEAL",
385                                      self.rpc_ncacn_ip_tcp_ntlm_check)
386
387     def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
388         creds = self.insta_creds(template=self.get_credentials(),
389                                  kerberos_state=MUST_USE_KERBEROS)
390         self._test_rpc_ncacn_ip_tcp(["krb5",
391                                      "ncacn_ip_tcp",
392                                      "ENC-TS Pre-authentication",
393                                      "ENC-TS Pre-authentication"],
394                                      creds, "dnsserver", "seal", "SEAL",
395                                      self.rpc_ncacn_ip_tcp_krb5_check)
396
397     def test_ldap(self):
398
399         def isLastExpectedMessage(msg):
400             return (msg["type"] == "Authorization" and
401                     msg["Authorization"]["serviceDescription"] == "LDAP" and
402                     msg["Authorization"]["transportProtection"] == "SIGN" and
403                     msg["Authorization"]["authType"] == "krb5")
404
405         self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
406                            lp = self.get_loadparm(),
407                            credentials=self.get_credentials())
408
409         messages = self.waitForMessages(isLastExpectedMessage)
410         self.assertEquals(3,
411                           len(messages),
412                           "Did not receive the expected number of messages")
413
414         # Check the first message it should be an Authentication
415         msg = messages[0]
416         self.assertEquals("Authentication", msg["type"])
417         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
418         self.assertEquals("Kerberos KDC",
419                            msg["Authentication"]["serviceDescription"])
420         self.assertEquals("ENC-TS Pre-authentication",
421                            msg["Authentication"]["authDescription"])
422
423         # Check the first message it should be an Authentication
424         msg = messages[1]
425         self.assertEquals("Authentication", msg["type"])
426         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
427         self.assertEquals("Kerberos KDC",
428                            msg["Authentication"]["serviceDescription"])
429         self.assertEquals("ENC-TS Pre-authentication",
430                            msg["Authentication"]["authDescription"])
431
432     def test_ldap_ntlm(self):
433
434         def isLastExpectedMessage(msg):
435             return (msg["type"] == "Authorization" and
436                     msg["Authorization"]["serviceDescription"] == "LDAP" and
437                     msg["Authorization"]["transportProtection"] == "SEAL" and
438                     msg["Authorization"]["authType"] == "NTLMSSP")
439
440         self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
441                            lp = self.get_loadparm(),
442                            credentials=self.get_credentials())
443
444         messages = self.waitForMessages(isLastExpectedMessage)
445         self.assertEquals(2,
446                           len(messages),
447                           "Did not receive the expected number of messages")
448         # Check the first message it should be an Authentication
449         msg = messages[0]
450         self.assertEquals("Authentication", msg["type"])
451         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
452         self.assertEquals("LDAP",
453                            msg["Authentication"]["serviceDescription"])
454         self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
455
456     def test_ldap_simple_bind(self):
457         def isLastExpectedMessage(msg):
458             return (msg["type"] == "Authorization" and
459                     msg["Authorization"]["serviceDescription"] == "LDAP" and
460                     msg["Authorization"]["transportProtection"] == "TLS" and
461                     msg["Authorization"]["authType"] == "simple bind")
462
463         creds = self.insta_creds(template=self.get_credentials())
464         creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
465                                      creds.get_username()))
466
467         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
468                            lp = self.get_loadparm(),
469                            credentials=creds)
470
471         messages = self.waitForMessages(isLastExpectedMessage)
472         self.assertEquals(2,
473                           len(messages),
474                           "Did not receive the expected number of messages")
475
476         # Check the first message it should be an Authentication
477         msg = messages[0]
478         self.assertEquals("Authentication", msg["type"])
479         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
480         self.assertEquals("LDAP",
481                            msg["Authentication"]["serviceDescription"])
482         self.assertEquals("simple bind",
483                            msg["Authentication"]["authDescription"])
484
485     def test_ldap_simple_bind_bad_password(self):
486         def isLastExpectedMessage(msg):
487             return (msg["type"] == "Authentication" and
488                     msg["Authentication"]["serviceDescription"] == "LDAP" and
489                     msg["Authentication"]["status"]
490                         == "NT_STATUS_WRONG_PASSWORD" and
491                     msg["Authentication"]["authDescription"] == "simple bind")
492
493         creds = self.insta_creds(template=self.get_credentials())
494         creds.set_password("badPassword")
495         creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
496                                      creds.get_username()))
497
498         thrown = False
499         try:
500             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
501                                lp = self.get_loadparm(),
502                                credentials=creds)
503         except LdbError:
504             thrown = True
505         self.assertEquals(thrown, True)
506
507         messages = self.waitForMessages(isLastExpectedMessage)
508         self.assertEquals(1,
509                           len(messages),
510                           "Did not receive the expected number of messages")
511
512
513     def test_ldap_simple_bind_bad_user(self):
514         def isLastExpectedMessage(msg):
515             return (msg["type"] == "Authentication" and
516                     msg["Authentication"]["serviceDescription"] == "LDAP" and
517                     msg["Authentication"]["status"]
518                         == "NT_STATUS_NO_SUCH_USER" and
519                     msg["Authentication"]["authDescription"] == "simple bind")
520
521         creds = self.insta_creds(template=self.get_credentials())
522         creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
523
524         thrown = False
525         try:
526             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
527                                lp = self.get_loadparm(),
528                                credentials=creds)
529         except LdbError:
530             thrown = True
531         self.assertEquals(thrown, True)
532
533         messages = self.waitForMessages(isLastExpectedMessage)
534         self.assertEquals(1,
535                           len(messages),
536                           "Did not receive the expected number of messages")
537
538
539     def test_ldap_simple_bind_unparseable_user(self):
540         def isLastExpectedMessage(msg):
541             return (msg["type"] == "Authentication" and
542                     msg["Authentication"]["serviceDescription"] == "LDAP" and
543                     msg["Authentication"]["status"]
544                         == "NT_STATUS_NO_SUCH_USER" and
545                     msg["Authentication"]["authDescription"] == "simple bind")
546
547         creds = self.insta_creds(template=self.get_credentials())
548         creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
549
550         thrown = False
551         try:
552             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
553                                lp = self.get_loadparm(),
554                                credentials=creds)
555         except LdbError:
556             thrown = True
557         self.assertEquals(thrown, True)
558
559         messages = self.waitForMessages(isLastExpectedMessage)
560         self.assertEquals(1,
561                           len(messages),
562                           "Did not receive the expected number of messages")
563
564     #
565     # Note: as this test does not expect any messages it will
566     #       time out in the call to self.waitForMessages.
567     #       This is expected, but it will slow this test.
568     def test_ldap_anonymous_access_bind_only(self):
569         # Should be no logging for anonymous bind
570         # so receiving any message indicates a failure.
571         def isLastExpectedMessage( msg):
572             return True
573
574         creds = self.insta_creds(template=self.get_credentials())
575         creds.set_anonymous()
576
577         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
578                            lp = self.get_loadparm(),
579                            credentials=creds)
580
581         messages = self.waitForMessages( isLastExpectedMessage)
582         self.assertEquals(0,
583                           len(messages),
584                           "Did not receive the expected number of messages")
585
586     def test_ldap_anonymous_access(self):
587         def isLastExpectedMessage( msg):
588             return (msg["type"] == "Authorization" and
589                     msg["Authorization"]["serviceDescription"]  == "LDAP" and
590                     msg["Authorization"]["transportProtection"] == "TLS" and
591                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
592                     msg["Authorization"]["authType"] == "no bind")
593
594         creds = self.insta_creds(template=self.get_credentials())
595         creds.set_anonymous()
596
597         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
598                            lp = self.get_loadparm(),
599                            credentials=creds)
600
601         try:
602             res = self.samdb.search(base=self.samdb.domain_dn())
603             self.fail( "Expected an LdbError exception")
604         except LdbError:
605             pass
606
607         messages = self.waitForMessages( isLastExpectedMessage)
608         self.assertEquals(1,
609                           len(messages),
610                           "Did not receive the expected number of messages")
611     def test_smb(self):
612         def isLastExpectedMessage(msg):
613             return (msg["type"] == "Authorization" and
614                     msg["Authorization"]["serviceDescription"] == "SMB" and
615                     msg["Authorization"]["authType"] == "krb5" and
616                     msg["Authorization"]["transportProtection"] == "SMB")
617
618         creds = self.insta_creds(template=self.get_credentials())
619         smb.SMB(self.server,
620                 "sysvol",
621                 lp=self.get_loadparm(),
622                 creds=creds)
623
624         messages = self.waitForMessages(isLastExpectedMessage)
625         self.assertEquals(3,
626                           len(messages),
627                           "Did not receive the expected number of messages")
628         # Check the first message it should be an Authentication
629         msg = messages[0]
630         self.assertEquals("Authentication", msg["type"])
631         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
632         self.assertEquals("Kerberos KDC",
633                            msg["Authentication"]["serviceDescription"])
634         self.assertEquals("ENC-TS Pre-authentication",
635                            msg["Authentication"]["authDescription"])
636
637         # Check the second message it should be an Authentication
638         msg = messages[1]
639         self.assertEquals("Authentication", msg["type"])
640         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
641         self.assertEquals("Kerberos KDC",
642                            msg["Authentication"]["serviceDescription"])
643         self.assertEquals("ENC-TS Pre-authentication",
644                            msg["Authentication"]["authDescription"])
645
646     def test_smb_bad_password(self):
647         def isLastExpectedMessage(msg):
648             return (msg["type"] == "Authentication" and
649                     msg["Authentication"]["serviceDescription"]
650                         == "Kerberos KDC" and
651                     msg["Authentication"]["status"]
652                         == "NT_STATUS_WRONG_PASSWORD" and
653                     msg["Authentication"]["authDescription"]
654                         == "ENC-TS Pre-authentication")
655
656         creds = self.insta_creds(template=self.get_credentials())
657         creds.set_password("badPassword")
658
659         thrown = False
660         try:
661             smb.SMB(self.server,
662                     "sysvol",
663                     lp=self.get_loadparm(),
664                     creds=creds)
665         except NTSTATUSError:
666             thrown = True
667         self.assertEquals(thrown, True)
668
669         messages = self.waitForMessages(isLastExpectedMessage)
670         self.assertEquals(1,
671                           len(messages),
672                           "Did not receive the expected number of messages")
673
674
675     def test_smb_bad_user(self):
676         def isLastExpectedMessage(msg):
677             return (msg["type"] == "Authentication" and
678                     msg["Authentication"]["serviceDescription"]
679                         == "Kerberos KDC" and
680                     msg["Authentication"]["status"]
681                         == "NT_STATUS_NO_SUCH_USER" and
682                     msg["Authentication"]["authDescription"]
683                         == "ENC-TS Pre-authentication")
684
685         creds = self.insta_creds(template=self.get_credentials())
686         creds.set_username("badUser")
687
688         thrown = False
689         try:
690             smb.SMB(self.server,
691                     "sysvol",
692                     lp=self.get_loadparm(),
693                     creds=creds)
694         except NTSTATUSError:
695             thrown = True
696         self.assertEquals(thrown, True)
697
698         messages = self.waitForMessages(isLastExpectedMessage)
699         self.assertEquals(1,
700                           len(messages),
701                           "Did not receive the expected number of messages")
702
703     def test_smb1_anonymous(self):
704         def isLastExpectedMessage(msg):
705             return (msg["type"] == "Authorization" and
706                     msg["Authorization"]["serviceDescription"] == "SMB" and
707                     msg["Authorization"]["authType"] == "NTLMSSP" and
708                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
709                     msg["Authorization"]["transportProtection"] == "SMB")
710
711         server   = os.environ["SERVER"]
712
713         path = "//%s/IPC$" % server
714         auth = "-N"
715         call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
716
717         messages = self.waitForMessages(isLastExpectedMessage)
718         self.assertEquals(3,
719                           len(messages),
720                           "Did not receive the expected number of messages")
721
722         # Check the first message it should be an Authentication
723         msg = messages[0]
724         self.assertEquals("Authentication", msg["type"])
725         self.assertEquals("NT_STATUS_NO_SUCH_USER",
726                           msg["Authentication"]["status"])
727         self.assertEquals("SMB",
728                           msg["Authentication"]["serviceDescription"])
729         self.assertEquals("NTLMSSP",
730                           msg["Authentication"]["authDescription"])
731         self.assertEquals("No-Password",
732                           msg["Authentication"]["passwordType"])
733
734         # Check the second message it should be an Authentication
735         msg = messages[1]
736         self.assertEquals("Authentication", msg["type"])
737         self.assertEquals("NT_STATUS_OK",
738                           msg["Authentication"]["status"])
739         self.assertEquals("SMB",
740                           msg["Authentication"]["serviceDescription"])
741         self.assertEquals("NTLMSSP",
742                           msg["Authentication"]["authDescription"])
743         self.assertEquals("No-Password",
744                           msg["Authentication"]["passwordType"])
745         self.assertEquals("ANONYMOUS LOGON",
746                           msg["Authentication"]["becameAccount"])
747
748     def test_smb2_anonymous(self):
749         def isLastExpectedMessage(msg):
750             return (msg["type"] == "Authorization" and
751                     msg["Authorization"]["serviceDescription"] == "SMB2" and
752                     msg["Authorization"]["authType"] == "NTLMSSP" and
753                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
754                     msg["Authorization"]["transportProtection"] == "SMB")
755
756         server   = os.environ["SERVER"]
757
758         path = "//%s/IPC$" % server
759         auth = "-N"
760         call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
761
762         messages = self.waitForMessages(isLastExpectedMessage)
763         self.assertEquals(3,
764                           len(messages),
765                           "Did not receive the expected number of messages")
766
767         # Check the first message it should be an Authentication
768         msg = messages[0]
769         self.assertEquals("Authentication", msg["type"])
770         self.assertEquals("NT_STATUS_NO_SUCH_USER",
771                           msg["Authentication"]["status"])
772         self.assertEquals("SMB2",
773                           msg["Authentication"]["serviceDescription"])
774         self.assertEquals("NTLMSSP",
775                           msg["Authentication"]["authDescription"])
776         self.assertEquals("No-Password",
777                           msg["Authentication"]["passwordType"])
778
779         # Check the second message it should be an Authentication
780         msg = messages[1]
781         self.assertEquals("Authentication", msg["type"])
782         self.assertEquals("NT_STATUS_OK",
783                           msg["Authentication"]["status"])
784         self.assertEquals("SMB2",
785                           msg["Authentication"]["serviceDescription"])
786         self.assertEquals("NTLMSSP",
787                           msg["Authentication"]["authDescription"])
788         self.assertEquals("No-Password",
789                           msg["Authentication"]["passwordType"])
790         self.assertEquals("ANONYMOUS LOGON",
791                           msg["Authentication"]["becameAccount"])
792
793     def test_smb_no_krb_spnego(self):
794         def isLastExpectedMessage(msg):
795             return (msg["type"] == "Authorization" and
796                     msg["Authorization"]["serviceDescription"] == "SMB" and
797                     msg["Authorization"]["authType"] == "NTLMSSP" and
798                     msg["Authorization"]["transportProtection"] == "SMB")
799
800         creds = self.insta_creds(template=self.get_credentials(),
801                                  kerberos_state=DONT_USE_KERBEROS)
802         smb.SMB(self.server,
803                 "sysvol",
804                 lp=self.get_loadparm(),
805                 creds=creds)
806
807         messages = self.waitForMessages(isLastExpectedMessage)
808         self.assertEquals(2,
809                           len(messages),
810                           "Did not receive the expected number of messages")
811         # Check the first message it should be an Authentication
812         msg = messages[0]
813         self.assertEquals("Authentication", msg["type"])
814         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
815         self.assertEquals("SMB",
816                           msg["Authentication"]["serviceDescription"])
817         self.assertEquals("NTLMSSP",
818                           msg["Authentication"]["authDescription"])
819         self.assertEquals("NTLMv2",
820                           msg["Authentication"]["passwordType"])
821
822     def test_smb_no_krb_spnego_bad_password(self):
823         def isLastExpectedMessage(msg):
824             return (msg["type"] == "Authentication" and
825                     msg["Authentication"]["serviceDescription"] == "SMB" and
826                     msg["Authentication"]["authDescription"] == "NTLMSSP" and
827                     msg["Authentication"]["passwordType"] == "NTLMv2" and
828                     msg["Authentication"]["status"]
829                         == "NT_STATUS_WRONG_PASSWORD")
830
831         creds = self.insta_creds(template=self.get_credentials(),
832                                  kerberos_state=DONT_USE_KERBEROS)
833         creds.set_password("badPassword")
834
835         thrown = False
836         try:
837             smb.SMB(self.server,
838                     "sysvol",
839                     lp=self.get_loadparm(),
840                     creds=creds)
841         except NTSTATUSError:
842             thrown = True
843         self.assertEquals(thrown, True)
844
845         messages = self.waitForMessages(isLastExpectedMessage)
846         self.assertEquals(1,
847                           len(messages),
848                           "Did not receive the expected number of messages")
849
850     def test_smb_no_krb_spnego_bad_user(self):
851         def isLastExpectedMessage(msg):
852             return (msg["type"] == "Authentication" and
853                     msg["Authentication"]["serviceDescription"] == "SMB" and
854                     msg["Authentication"]["authDescription"] == "NTLMSSP" and
855                     msg["Authentication"]["passwordType"] == "NTLMv2" and
856                     msg["Authentication"]["status"]
857                         == "NT_STATUS_NO_SUCH_USER")
858
859         creds = self.insta_creds(template=self.get_credentials(),
860                                  kerberos_state=DONT_USE_KERBEROS)
861         creds.set_username("badUser")
862
863         thrown = False
864         try:
865             smb.SMB(self.server,
866                     "sysvol",
867                     lp=self.get_loadparm(),
868                     creds=creds)
869         except NTSTATUSError:
870             thrown = True
871         self.assertEquals(thrown, True)
872
873         messages = self.waitForMessages(isLastExpectedMessage)
874         self.assertEquals(1,
875                           len(messages),
876                           "Did not receive the expected number of messages")
877
878     def test_smb_no_krb_no_spnego_no_ntlmv2(self):
879         def isLastExpectedMessage(msg):
880             return (msg["type"] == "Authorization" and
881                     msg["Authorization"]["serviceDescription"] == "SMB" and
882                     msg["Authorization"]["authType"] == "bare-NTLM" and
883                     msg["Authorization"]["transportProtection"] == "SMB")
884
885         creds = self.insta_creds(template=self.get_credentials(),
886                                  kerberos_state=DONT_USE_KERBEROS)
887         smb.SMB(self.server,
888                 "sysvol",
889                 lp=self.get_loadparm(),
890                 creds=creds,
891                 ntlmv2_auth=False,
892                 use_spnego=False)
893
894         messages = self.waitForMessages(isLastExpectedMessage)
895         self.assertEquals(2,
896                           len(messages),
897                           "Did not receive the expected number of messages")
898         # Check the first message it should be an Authentication
899         msg = messages[0]
900         self.assertEquals("Authentication", msg["type"])
901         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
902         self.assertEquals("SMB",
903                           msg["Authentication"]["serviceDescription"])
904         self.assertEquals("bare-NTLM",
905                           msg["Authentication"]["authDescription"])
906         self.assertEquals("NTLMv1",
907                           msg["Authentication"]["passwordType"])
908
909     def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
910         def isLastExpectedMessage(msg):
911             return (msg["type"] == "Authentication" and
912                     msg["Authentication"]["serviceDescription"] == "SMB" and
913                     msg["Authentication"]["authDescription"] == "bare-NTLM" and
914                     msg["Authentication"]["passwordType"] == "NTLMv1" and
915                     msg["Authentication"]["status"]
916                         == "NT_STATUS_WRONG_PASSWORD")
917
918         creds = self.insta_creds(template=self.get_credentials(),
919                                  kerberos_state=DONT_USE_KERBEROS)
920         creds.set_password("badPassword")
921
922         thrown = False
923         try:
924             smb.SMB(self.server,
925                     "sysvol",
926                     lp=self.get_loadparm(),
927                     creds=creds,
928                     ntlmv2_auth=False,
929                     use_spnego=False)
930         except NTSTATUSError:
931             thrown = True
932         self.assertEquals(thrown, True)
933
934
935         messages = self.waitForMessages(isLastExpectedMessage)
936         self.assertEquals(1,
937                           len(messages),
938                           "Did not receive the expected number of messages")
939
940     def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
941         def isLastExpectedMessage(msg):
942             return (msg["type"] == "Authentication" and
943                     msg["Authentication"]["serviceDescription"] == "SMB" and
944                     msg["Authentication"]["authDescription"] == "bare-NTLM" and
945                     msg["Authentication"]["passwordType"] == "NTLMv1" and
946                     msg["Authentication"]["status"]
947                         == "NT_STATUS_NO_SUCH_USER")
948
949         creds = self.insta_creds(template=self.get_credentials(),
950                                  kerberos_state=DONT_USE_KERBEROS)
951         creds.set_username("badUser")
952
953         thrown = False
954         try:
955             smb.SMB(self.server,
956                     "sysvol",
957                     lp=self.get_loadparm(),
958                     creds=creds,
959                     ntlmv2_auth=False,
960                     use_spnego=False)
961         except NTSTATUSError:
962             thrown = True
963         self.assertEquals(thrown, True)
964
965
966         messages = self.waitForMessages(isLastExpectedMessage)
967         self.assertEquals(1,
968                           len(messages),
969                           "Did not receive the expected number of messages")
970
971     def test_samlogon_interactive(self):
972
973         workstation = "AuthLogTests"
974
975         def isLastExpectedMessage( msg):
976             return (msg["type"] == "Authentication" and
977                     msg["Authentication"]["serviceDescription"]
978                         == "SamLogon" and
979                     msg["Authentication"]["authDescription"]
980                         == "interactive" and
981                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
982                     msg["Authentication"]["workstation"]
983                         == r"\\%s" % workstation)
984
985         server   = os.environ["SERVER"]
986         user     = os.environ["USERNAME"]
987         password = os.environ["PASSWORD"]
988         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
989
990
991         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
992
993         messages = self.waitForMessages( isLastExpectedMessage)
994         messages = self.remove_netlogon_messages(messages)
995         received = len(messages)
996         self.assertIs(True,
997                       (received == 5 or received == 6),
998                       "Did not receive the expected number of messages")
999
1000     def test_samlogon_interactive_bad_password(self):
1001
1002         workstation = "AuthLogTests"
1003
1004         def isLastExpectedMessage( msg):
1005             return (msg["type"] == "Authentication" and
1006                     msg["Authentication"]["serviceDescription"]
1007                         == "SamLogon" and
1008                     msg["Authentication"]["authDescription"]
1009                         == "interactive" and
1010                     msg["Authentication"]["status"]
1011                         == "NT_STATUS_WRONG_PASSWORD" and
1012                     msg["Authentication"]["workstation"]
1013                         == r"\\%s" % workstation)
1014
1015         server   = os.environ["SERVER"]
1016         user     = os.environ["USERNAME"]
1017         password = "badPassword"
1018         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1019
1020
1021         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1022
1023         messages = self.waitForMessages( isLastExpectedMessage)
1024         messages = self.remove_netlogon_messages(messages)
1025         received = len(messages)
1026         self.assertIs(True,
1027                       (received == 5 or received == 6),
1028                       "Did not receive the expected number of messages")
1029
1030     def test_samlogon_interactive_bad_user(self):
1031
1032         workstation = "AuthLogTests"
1033
1034         def isLastExpectedMessage( msg):
1035             return (msg["type"] == "Authentication" and
1036                     msg["Authentication"]["serviceDescription"]
1037                         == "SamLogon" and
1038                     msg["Authentication"]["authDescription"]
1039                         == "interactive" and
1040                     msg["Authentication"]["status"]
1041                         == "NT_STATUS_NO_SUCH_USER" and
1042                     msg["Authentication"]["workstation"]
1043                         == r"\\%s" % workstation)
1044
1045         server   = os.environ["SERVER"]
1046         user     = "badUser"
1047         password = os.environ["PASSWORD"]
1048         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1049
1050
1051         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1052
1053         messages = self.waitForMessages( isLastExpectedMessage)
1054         messages = self.remove_netlogon_messages(messages)
1055         received = len(messages)
1056         self.assertIs(True,
1057                       (received == 5 or received == 6),
1058                       "Did not receive the expected number of messages")
1059
1060     def test_samlogon_network(self):
1061
1062         workstation = "AuthLogTests"
1063
1064         def isLastExpectedMessage( msg):
1065             return (msg["type"] == "Authentication" and
1066                     msg["Authentication"]["serviceDescription"]
1067                         == "SamLogon" and
1068                     msg["Authentication"]["authDescription"]
1069                         == "network" and
1070                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1071                     msg["Authentication"]["workstation"]
1072                         == r"\\%s" % workstation)
1073
1074         server   = os.environ["SERVER"]
1075         user     = os.environ["USERNAME"]
1076         password = os.environ["PASSWORD"]
1077         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1078
1079
1080         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1081
1082         messages = self.waitForMessages( isLastExpectedMessage)
1083         messages = self.remove_netlogon_messages(messages)
1084         received = len(messages)
1085         self.assertIs(True,
1086                       (received == 5 or received == 6),
1087                       "Did not receive the expected number of messages")
1088
1089     def test_samlogon_network_bad_password(self):
1090
1091         workstation = "AuthLogTests"
1092
1093         def isLastExpectedMessage( msg):
1094             return (msg["type"] == "Authentication" and
1095                     msg["Authentication"]["serviceDescription"]
1096                         == "SamLogon" and
1097                     msg["Authentication"]["authDescription"]
1098                         == "network" and
1099                     msg["Authentication"]["status"]
1100                         == "NT_STATUS_WRONG_PASSWORD" and
1101                     msg["Authentication"]["workstation"]
1102                         == r"\\%s" % workstation)
1103
1104         server   = os.environ["SERVER"]
1105         user     = os.environ["USERNAME"]
1106         password = "badPassword"
1107         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1108
1109
1110         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1111
1112         messages = self.waitForMessages( isLastExpectedMessage)
1113         messages = self.remove_netlogon_messages(messages)
1114         received = len(messages)
1115         self.assertIs(True,
1116                       (received == 5 or received == 6),
1117                       "Did not receive the expected number of messages")
1118
1119     def test_samlogon_network_bad_user(self):
1120
1121         workstation = "AuthLogTests"
1122
1123         def isLastExpectedMessage( msg):
1124             return (msg["type"] == "Authentication" and
1125                     msg["Authentication"]["serviceDescription"]
1126                         == "SamLogon" and
1127                     msg["Authentication"]["authDescription"]
1128                         == "network" and
1129                     msg["Authentication"]["status"]
1130                         == "NT_STATUS_NO_SUCH_USER" and
1131                     msg["Authentication"]["workstation"]
1132                         == r"\\%s" % workstation)
1133
1134         server   = os.environ["SERVER"]
1135         user     = "badUser"
1136         password =  os.environ["PASSWORD"]
1137         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1138
1139
1140         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1141
1142         messages = self.waitForMessages( isLastExpectedMessage)
1143         messages = self.remove_netlogon_messages(messages)
1144         received = len(messages)
1145         self.assertIs(True,
1146                       (received == 5 or received == 6),
1147                       "Did not receive the expected number of messages")
1148
1149     def test_samlogon_network_mschap(self):
1150
1151         workstation = "AuthLogTests"
1152
1153         def isLastExpectedMessage( msg):
1154             return (msg["type"] == "Authentication" and
1155                     msg["Authentication"]["serviceDescription"]
1156                         == "SamLogon" and
1157                     msg["Authentication"]["authDescription"]
1158                         == "network" and
1159                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1160                     msg["Authentication"]["passwordType"] == "MSCHAPv2" and
1161                     msg["Authentication"]["workstation"]
1162                         == r"\\%s" % workstation)
1163
1164         server   = os.environ["SERVER"]
1165         user     = os.environ["USERNAME"]
1166         password = os.environ["PASSWORD"]
1167         samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2)
1168
1169
1170         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1171
1172         messages = self.waitForMessages( isLastExpectedMessage)
1173         messages = self.remove_netlogon_messages(messages)
1174         received = len(messages)
1175         self.assertIs(True,
1176                       (received == 5 or received == 6),
1177                       "Did not receive the expected number of messages")
1178
1179     def test_samlogon_network_mschap_bad_password(self):
1180
1181         workstation = "AuthLogTests"
1182
1183         def isLastExpectedMessage( msg):
1184             return (msg["type"] == "Authentication" and
1185                     msg["Authentication"]["serviceDescription"]
1186                         == "SamLogon" and
1187                     msg["Authentication"]["authDescription"]
1188                         == "network" and
1189                     msg["Authentication"]["status"]
1190                         == "NT_STATUS_WRONG_PASSWORD" and
1191                     msg["Authentication"]["passwordType"] == "MSCHAPv2" and
1192                     msg["Authentication"]["workstation"]
1193                         == r"\\%s" % workstation)
1194
1195         server   = os.environ["SERVER"]
1196         user     = os.environ["USERNAME"]
1197         password = "badPassword"
1198         samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2)
1199
1200
1201         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1202
1203         messages = self.waitForMessages( isLastExpectedMessage)
1204         messages = self.remove_netlogon_messages(messages)
1205         received = len(messages)
1206         self.assertIs(True,
1207                       (received == 5 or received == 6),
1208                       "Did not receive the expected number of messages")
1209
1210     def test_samlogon_network_mschap_bad_user(self):
1211
1212         workstation = "AuthLogTests"
1213
1214         def isLastExpectedMessage( msg):
1215             return (msg["type"] == "Authentication" and
1216                     msg["Authentication"]["serviceDescription"]
1217                         == "SamLogon" and
1218                     msg["Authentication"]["authDescription"]
1219                         == "network" and
1220                     msg["Authentication"]["status"]
1221                         == "NT_STATUS_NO_SUCH_USER" and
1222                     msg["Authentication"]["passwordType"] == "MSCHAPv2" and
1223                     msg["Authentication"]["workstation"]
1224                         == r"\\%s" % workstation)
1225
1226         server   = os.environ["SERVER"]
1227         user     = "badUser"
1228         password = os.environ["PASSWORD"]
1229         samlogon = "samlogon %s %s %s %d 0x00010000" % (user, password, workstation, 2)
1230
1231
1232         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1233
1234         messages = self.waitForMessages( isLastExpectedMessage)
1235         messages = self.remove_netlogon_messages(messages)
1236         received = len(messages)
1237         self.assertIs(True,
1238                       (received == 5 or received == 6),
1239                       "Did not receive the expected number of messages")
1240
1241     def test_samlogon_schannel_seal(self):
1242
1243         workstation = "AuthLogTests"
1244
1245         def isLastExpectedMessage( msg):
1246             return (msg["type"] == "Authentication" and
1247                     msg["Authentication"]["serviceDescription"]
1248                         == "SamLogon" and
1249                     msg["Authentication"]["authDescription"]
1250                         == "network" and
1251                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1252                     msg["Authentication"]["workstation"]
1253                         == r"\\%s" % workstation)
1254
1255         server   = os.environ["SERVER"]
1256         user     = os.environ["USERNAME"]
1257         password = os.environ["PASSWORD"]
1258         samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1259
1260
1261         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1262
1263         messages = self.waitForMessages( isLastExpectedMessage)
1264         messages = self.remove_netlogon_messages(messages)
1265         received = len(messages)
1266         self.assertIs(True,
1267                       (received == 5 or received == 6),
1268                       "Did not receive the expected number of messages")
1269
1270         # Check the second to last message it should be an Authorization
1271         msg = messages[-2]
1272         self.assertEquals("Authorization", msg["type"])
1273         self.assertEquals("DCE/RPC",
1274                           msg["Authorization"]["serviceDescription"])
1275         self.assertEquals("schannel",  msg["Authorization"]["authType"])
1276         self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1277
1278     # Signed logons get promoted to sealed, this test ensures that
1279     # this behaviour is not removed accidently
1280     def test_samlogon_schannel_sign(self):
1281
1282         workstation = "AuthLogTests"
1283
1284         def isLastExpectedMessage( msg):
1285             return (msg["type"] == "Authentication" and
1286                     msg["Authentication"]["serviceDescription"]
1287                         == "SamLogon" and
1288                     msg["Authentication"]["authDescription"]
1289                         == "network" and
1290                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1291                     msg["Authentication"]["workstation"]
1292                         == r"\\%s" % workstation)
1293
1294         server   = os.environ["SERVER"]
1295         user     = os.environ["USERNAME"]
1296         password = os.environ["PASSWORD"]
1297         samlogon = "schannelsign;samlogon %s %s %s" % (user, password, workstation)
1298
1299
1300         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1301
1302         messages = self.waitForMessages( isLastExpectedMessage)
1303         messages = self.remove_netlogon_messages(messages)
1304         received = len(messages)
1305         self.assertIs(True,
1306                       (received == 5 or received == 6),
1307                       "Did not receive the expected number of messages")
1308
1309         # Check the second to last message it should be an Authorization
1310         msg = messages[-2]
1311         self.assertEquals("Authorization", msg["type"])
1312         self.assertEquals("DCE/RPC",
1313                           msg["Authorization"]["serviceDescription"])
1314         self.assertEquals("schannel",  msg["Authorization"]["authType"])
1315         self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])