auth log: Add windows logon type codes
[samba.git] / python / samba / tests / auth_log.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging.
20 """
21 import samba.tests
22 from samba.dcerpc import srvsvc, dnsserver
23 import os
24 from samba import smb
25 from samba.samdb import SamDB
26 import samba.tests.auth_log_base
27 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
28 from samba import NTSTATUSError
29 from subprocess import call
30 from ldb import LdbError
31 from samba.dcerpc.windows_event_ids import (
32     EVT_ID_SUCCESSFUL_LOGON,
33     EVT_ID_UNSUCCESSFUL_LOGON,
34     EVT_LOGON_NETWORK,
35     EVT_LOGON_INTERACTIVE,
36     EVT_LOGON_NETWORK_CLEAR_TEXT
37 )
38 import re
39
40
41 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
42
43     def setUp(self):
44         super(AuthLogTests, self).setUp()
45         self.remoteAddress = os.environ["CLIENT_IP"]
46
47     def tearDown(self):
48         super(AuthLogTests, self).tearDown()
49
50     def _test_rpc_ncacn_np(self, authTypes, creds, service,
51                            binding, protection, checkFunction):
52         def isLastExpectedMessage(msg):
53             return (msg["type"] == "Authorization" and
54                     (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
55                      msg["Authorization"]["serviceDescription"] == service) and
56                     msg["Authorization"]["authType"] == authTypes[0] and
57                     msg["Authorization"]["transportProtection"] == protection)
58
59         if binding:
60             binding = "[%s]" % binding
61
62         if service == "dnsserver":
63             x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
64                                     self.get_loadparm(),
65                                     creds)
66         elif service == "srvsvc":
67             x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
68                               self.get_loadparm(),
69                               creds)
70
71         # The connection is passed to ensure the server
72         # messaging context stays up until all the messages have been received.
73         messages = self.waitForMessages(isLastExpectedMessage, x)
74         checkFunction(messages, authTypes, service, binding, protection)
75
76     def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
77         # Turn "[foo,bar]" into a list ("foo", "bar") to test
78         # lambda x: x removes anything that evaluates to False,
79         # including empty strings, so we handle "" as well
80         binding_list = list(filter(lambda x: x, re.compile('[\[,\]]').split(binding)))
81
82         # Handle explicit smb2, smb1 or auto negotiation
83         if "smb2" in binding_list:
84             self.assertEquals(serviceDescription, "SMB2")
85         elif "smb1" in binding_list:
86             self.assertEquals(serviceDescription, "SMB")
87         else:
88             self.assertIn(serviceDescription, ["SMB", "SMB2"])
89
90     def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
91                                 binding, protection):
92
93         expected_messages = len(authTypes)
94         self.assertEquals(expected_messages,
95                           len(messages),
96                           "Did not receive the expected number of messages")
97
98         # Check the first message it should be an Authentication
99         msg = messages[0]
100         self.assertEquals("Authentication", msg["type"])
101         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
102         self.assertEquals(
103             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
104         self.assertEquals(
105             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
106         self._assert_ncacn_np_serviceDescription(binding,
107                                                  msg["Authentication"]["serviceDescription"])
108         self.assertEquals(authTypes[1],
109                           msg["Authentication"]["authDescription"])
110
111         # Check the second message it should be an Authorization
112         msg = messages[1]
113         self.assertEquals("Authorization", msg["type"])
114         self._assert_ncacn_np_serviceDescription(binding,
115                                                  msg["Authorization"]["serviceDescription"])
116         self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
117         self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
118         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
119
120         # Check the third message it should be an Authentication
121         # if we are expecting 4 messages
122         if expected_messages == 4:
123             def checkServiceDescription(desc):
124                 return (desc == "DCE/RPC" or desc == service)
125
126             msg = messages[2]
127             self.assertEquals("Authentication", msg["type"])
128             self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
129             self.assertTrue(
130                 checkServiceDescription(
131                     msg["Authentication"]["serviceDescription"]))
132
133             self.assertEquals(authTypes[3],
134                               msg["Authentication"]["authDescription"])
135             self.assertEquals(
136                 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
137             self.assertEquals(
138                 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
139
140     def rpc_ncacn_np_krb5_check(
141             self,
142             messages,
143             authTypes,
144             service,
145             binding,
146             protection):
147
148         expected_messages = len(authTypes)
149         self.assertEquals(expected_messages,
150                           len(messages),
151                           "Did not receive the expected number of messages")
152
153         # Check the first message it should be an Authentication
154         # This is almost certainly Authentication over UDP, and is probably
155         # returning message too big,
156         msg = messages[0]
157         self.assertEquals("Authentication", msg["type"])
158         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
159         self.assertEquals("Kerberos KDC",
160                           msg["Authentication"]["serviceDescription"])
161         self.assertEquals(authTypes[1],
162                           msg["Authentication"]["authDescription"])
163         self.assertEquals(
164             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
165         self.assertEquals(
166             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
167
168         # Check the second message it should be an Authentication
169         # This this the TCP Authentication in response to the message too big
170         # response to the UDP Authentication
171         msg = messages[1]
172         self.assertEquals("Authentication", msg["type"])
173         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
174         self.assertEquals("Kerberos KDC",
175                           msg["Authentication"]["serviceDescription"])
176         self.assertEquals(authTypes[2],
177                           msg["Authentication"]["authDescription"])
178         self.assertEquals(
179             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
180         self.assertEquals(
181             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
182
183         # Check the third message it should be an Authorization
184         msg = messages[2]
185         self.assertEquals("Authorization", msg["type"])
186         self._assert_ncacn_np_serviceDescription(binding,
187                                                  msg["Authorization"]["serviceDescription"])
188         self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
189         self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
190         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
191
192     def test_rpc_ncacn_np_ntlm_dns_sign(self):
193         creds = self.insta_creds(template=self.get_credentials(),
194                                  kerberos_state=DONT_USE_KERBEROS)
195         self._test_rpc_ncacn_np(["NTLMSSP",
196                                  "NTLMSSP",
197                                  "NTLMSSP",
198                                  "NTLMSSP"],
199                                 creds, "dnsserver", "sign", "SIGN",
200                                 self.rpc_ncacn_np_ntlm_check)
201
202     def test_rpc_ncacn_np_ntlm_srv_sign(self):
203         creds = self.insta_creds(template=self.get_credentials(),
204                                  kerberos_state=DONT_USE_KERBEROS)
205         self._test_rpc_ncacn_np(["NTLMSSP",
206                                  "NTLMSSP",
207                                  "NTLMSSP",
208                                  "NTLMSSP"],
209                                 creds, "srvsvc", "sign", "SIGN",
210                                 self.rpc_ncacn_np_ntlm_check)
211
212     def test_rpc_ncacn_np_ntlm_dns(self):
213         creds = self.insta_creds(template=self.get_credentials(),
214                                  kerberos_state=DONT_USE_KERBEROS)
215         self._test_rpc_ncacn_np(["ncacn_np",
216                                  "NTLMSSP",
217                                  "NTLMSSP"],
218                                 creds, "dnsserver", "", "SMB",
219                                 self.rpc_ncacn_np_ntlm_check)
220
221     def test_rpc_ncacn_np_ntlm_srv(self):
222         creds = self.insta_creds(template=self.get_credentials(),
223                                  kerberos_state=DONT_USE_KERBEROS)
224         self._test_rpc_ncacn_np(["ncacn_np",
225                                  "NTLMSSP",
226                                  "NTLMSSP"],
227                                 creds, "srvsvc", "", "SMB",
228                                 self.rpc_ncacn_np_ntlm_check)
229
230     def test_rpc_ncacn_np_krb_dns_sign(self):
231         creds = self.insta_creds(template=self.get_credentials(),
232                                  kerberos_state=MUST_USE_KERBEROS)
233         self._test_rpc_ncacn_np(["krb5",
234                                  "ENC-TS Pre-authentication",
235                                  "ENC-TS Pre-authentication",
236                                  "krb5"],
237                                 creds, "dnsserver", "sign", "SIGN",
238                                 self.rpc_ncacn_np_krb5_check)
239
240     def test_rpc_ncacn_np_krb_srv_sign(self):
241         creds = self.insta_creds(template=self.get_credentials(),
242                                  kerberos_state=MUST_USE_KERBEROS)
243         self._test_rpc_ncacn_np(["krb5",
244                                  "ENC-TS Pre-authentication",
245                                  "ENC-TS Pre-authentication",
246                                  "krb5"],
247                                 creds, "srvsvc", "sign", "SIGN",
248                                 self.rpc_ncacn_np_krb5_check)
249
250     def test_rpc_ncacn_np_krb_dns(self):
251         creds = self.insta_creds(template=self.get_credentials(),
252                                  kerberos_state=MUST_USE_KERBEROS)
253         self._test_rpc_ncacn_np(["ncacn_np",
254                                  "ENC-TS Pre-authentication",
255                                  "ENC-TS Pre-authentication",
256                                  "krb5"],
257                                 creds, "dnsserver", "", "SMB",
258                                 self.rpc_ncacn_np_krb5_check)
259
260     def test_rpc_ncacn_np_krb_dns_smb2(self):
261         creds = self.insta_creds(template=self.get_credentials(),
262                                  kerberos_state=MUST_USE_KERBEROS)
263         self._test_rpc_ncacn_np(["ncacn_np",
264                                  "ENC-TS Pre-authentication",
265                                  "ENC-TS Pre-authentication",
266                                  "krb5"],
267                                 creds, "dnsserver", "smb2", "SMB",
268                                 self.rpc_ncacn_np_krb5_check)
269
270     def test_rpc_ncacn_np_krb_srv(self):
271         creds = self.insta_creds(template=self.get_credentials(),
272                                  kerberos_state=MUST_USE_KERBEROS)
273         self._test_rpc_ncacn_np(["ncacn_np",
274                                  "ENC-TS Pre-authentication",
275                                  "ENC-TS Pre-authentication",
276                                  "krb5"],
277                                 creds, "srvsvc", "", "SMB",
278                                 self.rpc_ncacn_np_krb5_check)
279
280     def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
281                                binding, protection, checkFunction):
282         def isLastExpectedMessage(msg):
283             return (msg["type"] == "Authorization" and
284                     msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
285                     msg["Authorization"]["authType"] == authTypes[0] and
286                     msg["Authorization"]["transportProtection"] == protection)
287
288         if binding:
289             binding = "[%s]" % binding
290
291         if service == "dnsserver":
292             conn = dnsserver.dnsserver(
293                 "ncacn_ip_tcp:%s%s" % (self.server, binding),
294                 self.get_loadparm(),
295                 creds)
296         elif service == "srvsvc":
297             conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
298                                  self.get_loadparm(),
299                                  creds)
300
301         messages = self.waitForMessages(isLastExpectedMessage, conn)
302         checkFunction(messages, authTypes, service, binding, protection)
303
304     def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
305                                     binding, protection):
306
307         expected_messages = len(authTypes)
308         self.assertEquals(expected_messages,
309                           len(messages),
310                           "Did not receive the expected number of messages")
311
312         # Check the first message it should be an Authorization
313         msg = messages[0]
314         self.assertEquals("Authorization", msg["type"])
315         self.assertEquals("DCE/RPC",
316                           msg["Authorization"]["serviceDescription"])
317         self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
318         self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
319         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
320
321         # Check the second message it should be an Authentication
322         msg = messages[1]
323         self.assertEquals("Authentication", msg["type"])
324         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
325         self.assertEquals("DCE/RPC",
326                           msg["Authentication"]["serviceDescription"])
327         self.assertEquals(authTypes[2],
328                           msg["Authentication"]["authDescription"])
329         self.assertEquals(
330             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
331         self.assertEquals(
332             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
333
334     def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
335                                     binding, protection):
336
337         expected_messages = len(authTypes)
338         self.assertEquals(expected_messages,
339                           len(messages),
340                           "Did not receive the expected number of messages")
341
342         # Check the first message it should be an Authorization
343         msg = messages[0]
344         self.assertEquals("Authorization", msg["type"])
345         self.assertEquals("DCE/RPC",
346                           msg["Authorization"]["serviceDescription"])
347         self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
348         self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
349         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
350
351         # Check the second message it should be an Authentication
352         msg = messages[1]
353         self.assertEquals("Authentication", msg["type"])
354         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
355         self.assertEquals("Kerberos KDC",
356                           msg["Authentication"]["serviceDescription"])
357         self.assertEquals(authTypes[2],
358                           msg["Authentication"]["authDescription"])
359         self.assertEquals(
360             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
361         self.assertEquals(
362             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
363
364         # Check the third message it should be an Authentication
365         msg = messages[2]
366         self.assertEquals("Authentication", msg["type"])
367         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
368         self.assertEquals("Kerberos KDC",
369                           msg["Authentication"]["serviceDescription"])
370         self.assertEquals(authTypes[2],
371                           msg["Authentication"]["authDescription"])
372         self.assertEquals(
373             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
374         self.assertEquals(
375             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
376
377     def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
378         creds = self.insta_creds(template=self.get_credentials(),
379                                  kerberos_state=DONT_USE_KERBEROS)
380         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
381                                      "ncacn_ip_tcp",
382                                      "NTLMSSP"],
383                                     creds, "dnsserver", "sign", "SIGN",
384                                     self.rpc_ncacn_ip_tcp_ntlm_check)
385
386     def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
387         creds = self.insta_creds(template=self.get_credentials(),
388                                  kerberos_state=MUST_USE_KERBEROS)
389         self._test_rpc_ncacn_ip_tcp(["krb5",
390                                      "ncacn_ip_tcp",
391                                      "ENC-TS Pre-authentication",
392                                      "ENC-TS Pre-authentication"],
393                                     creds, "dnsserver", "sign", "SIGN",
394                                     self.rpc_ncacn_ip_tcp_krb5_check)
395
396     def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
397         creds = self.insta_creds(template=self.get_credentials(),
398                                  kerberos_state=DONT_USE_KERBEROS)
399         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
400                                      "ncacn_ip_tcp",
401                                      "NTLMSSP"],
402                                     creds, "dnsserver", "", "SIGN",
403                                     self.rpc_ncacn_ip_tcp_ntlm_check)
404
405     def test_rpc_ncacn_ip_tcp_krb5_dns(self):
406         creds = self.insta_creds(template=self.get_credentials(),
407                                  kerberos_state=MUST_USE_KERBEROS)
408         self._test_rpc_ncacn_ip_tcp(["krb5",
409                                      "ncacn_ip_tcp",
410                                      "ENC-TS Pre-authentication",
411                                      "ENC-TS Pre-authentication"],
412                                     creds, "dnsserver", "", "SIGN",
413                                     self.rpc_ncacn_ip_tcp_krb5_check)
414
415     def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
416         creds = self.insta_creds(template=self.get_credentials(),
417                                  kerberos_state=DONT_USE_KERBEROS)
418         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
419                                      "ncacn_ip_tcp",
420                                      "NTLMSSP"],
421                                     creds, "dnsserver", "connect", "NONE",
422                                     self.rpc_ncacn_ip_tcp_ntlm_check)
423
424     def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
425         creds = self.insta_creds(template=self.get_credentials(),
426                                  kerberos_state=MUST_USE_KERBEROS)
427         self._test_rpc_ncacn_ip_tcp(["krb5",
428                                      "ncacn_ip_tcp",
429                                      "ENC-TS Pre-authentication",
430                                      "ENC-TS Pre-authentication"],
431                                     creds, "dnsserver", "connect", "NONE",
432                                     self.rpc_ncacn_ip_tcp_krb5_check)
433
434     def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
435         creds = self.insta_creds(template=self.get_credentials(),
436                                  kerberos_state=DONT_USE_KERBEROS)
437         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
438                                      "ncacn_ip_tcp",
439                                      "NTLMSSP"],
440                                     creds, "dnsserver", "seal", "SEAL",
441                                     self.rpc_ncacn_ip_tcp_ntlm_check)
442
443     def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
444         creds = self.insta_creds(template=self.get_credentials(),
445                                  kerberos_state=MUST_USE_KERBEROS)
446         self._test_rpc_ncacn_ip_tcp(["krb5",
447                                      "ncacn_ip_tcp",
448                                      "ENC-TS Pre-authentication",
449                                      "ENC-TS Pre-authentication"],
450                                     creds, "dnsserver", "seal", "SEAL",
451                                     self.rpc_ncacn_ip_tcp_krb5_check)
452
453     def test_ldap(self):
454
455         def isLastExpectedMessage(msg):
456             return (msg["type"] == "Authorization" and
457                     msg["Authorization"]["serviceDescription"] == "LDAP" and
458                     msg["Authorization"]["transportProtection"] == "SIGN" and
459                     msg["Authorization"]["authType"] == "krb5")
460
461         self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
462                            lp=self.get_loadparm(),
463                            credentials=self.get_credentials())
464
465         messages = self.waitForMessages(isLastExpectedMessage)
466         self.assertEquals(3,
467                           len(messages),
468                           "Did not receive the expected number of messages")
469
470         # Check the first message it should be an Authentication
471         msg = messages[0]
472         self.assertEquals("Authentication", msg["type"])
473         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
474         self.assertEquals("Kerberos KDC",
475                           msg["Authentication"]["serviceDescription"])
476         self.assertEquals("ENC-TS Pre-authentication",
477                           msg["Authentication"]["authDescription"])
478         self.assertTrue(msg["Authentication"]["duration"] > 0)
479         self.assertEquals(
480             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
481         self.assertEquals(
482             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
483
484         # Check the second message it should be an Authentication
485         msg = messages[1]
486         self.assertEquals("Authentication", msg["type"])
487         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
488         self.assertEquals("Kerberos KDC",
489                           msg["Authentication"]["serviceDescription"])
490         self.assertEquals("ENC-TS Pre-authentication",
491                           msg["Authentication"]["authDescription"])
492         self.assertTrue(msg["Authentication"]["duration"] > 0)
493         self.assertEquals(
494             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
495         self.assertEquals(
496             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
497
498     def test_ldap_ntlm(self):
499
500         def isLastExpectedMessage(msg):
501             return (msg["type"] == "Authorization" and
502                     msg["Authorization"]["serviceDescription"] == "LDAP" and
503                     msg["Authorization"]["transportProtection"] == "SEAL" and
504                     msg["Authorization"]["authType"] == "NTLMSSP")
505
506         self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
507                            lp=self.get_loadparm(),
508                            credentials=self.get_credentials())
509
510         messages = self.waitForMessages(isLastExpectedMessage)
511         self.assertEquals(2,
512                           len(messages),
513                           "Did not receive the expected number of messages")
514         # Check the first message it should be an Authentication
515         msg = messages[0]
516         self.assertEquals("Authentication", msg["type"])
517         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
518         self.assertEquals("LDAP",
519                           msg["Authentication"]["serviceDescription"])
520         self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
521         self.assertTrue(msg["Authentication"]["duration"] > 0)
522         self.assertEquals(
523             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
524         self.assertEquals(
525             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
526
527     def test_ldap_simple_bind(self):
528         def isLastExpectedMessage(msg):
529             return (msg["type"] == "Authorization" and
530                     msg["Authorization"]["serviceDescription"] == "LDAP" and
531                     msg["Authorization"]["transportProtection"] == "TLS" and
532                     msg["Authorization"]["authType"] == "simple bind")
533
534         creds = self.insta_creds(template=self.get_credentials())
535         creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
536                                       creds.get_username()))
537
538         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
539                            lp=self.get_loadparm(),
540                            credentials=creds)
541
542         messages = self.waitForMessages(isLastExpectedMessage)
543         self.assertEquals(2,
544                           len(messages),
545                           "Did not receive the expected number of messages")
546
547         # Check the first message it should be an Authentication
548         msg = messages[0]
549         self.assertEquals("Authentication", msg["type"])
550         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
551         self.assertEquals("LDAP",
552                           msg["Authentication"]["serviceDescription"])
553         self.assertEquals("simple bind",
554                           msg["Authentication"]["authDescription"])
555         self.assertEquals(
556             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
557         self.assertEquals(
558             EVT_LOGON_NETWORK_CLEAR_TEXT, msg["Authentication"]["logonType"])
559
560     def test_ldap_simple_bind_bad_password(self):
561         def isLastExpectedMessage(msg):
562             return (msg["type"] == "Authentication" and
563                     msg["Authentication"]["serviceDescription"] == "LDAP" and
564                     (msg["Authentication"]["status"] ==
565                         "NT_STATUS_WRONG_PASSWORD") and
566                     (msg["Authentication"]["authDescription"] ==
567                         "simple bind") and
568                     (msg["Authentication"]["eventId"] ==
569                         EVT_ID_UNSUCCESSFUL_LOGON) and
570                     (msg["Authentication"]["logonType"] ==
571                         EVT_LOGON_NETWORK_CLEAR_TEXT))
572
573         creds = self.insta_creds(template=self.get_credentials())
574         creds.set_password("badPassword")
575         creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
576                                       creds.get_username()))
577
578         thrown = False
579         try:
580             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
581                                lp=self.get_loadparm(),
582                                credentials=creds)
583         except LdbError:
584             thrown = True
585         self.assertEquals(thrown, True)
586
587         messages = self.waitForMessages(isLastExpectedMessage)
588         self.assertEquals(1,
589                           len(messages),
590                           "Did not receive the expected number of messages")
591
592     def test_ldap_simple_bind_bad_user(self):
593         def isLastExpectedMessage(msg):
594             return (msg["type"] == "Authentication" and
595                     msg["Authentication"]["serviceDescription"] == "LDAP" and
596                     (msg["Authentication"]["status"] ==
597                         "NT_STATUS_NO_SUCH_USER") and
598                     (msg["Authentication"]["authDescription"] ==
599                         "simple bind") and
600                     (msg["Authentication"]["eventId"] ==
601                         EVT_ID_UNSUCCESSFUL_LOGON) and
602                     (msg["Authentication"]["logonType"] ==
603                         EVT_LOGON_NETWORK_CLEAR_TEXT))
604
605         creds = self.insta_creds(template=self.get_credentials())
606         creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
607
608         thrown = False
609         try:
610             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
611                                lp=self.get_loadparm(),
612                                credentials=creds)
613         except LdbError:
614             thrown = True
615         self.assertEquals(thrown, True)
616
617         messages = self.waitForMessages(isLastExpectedMessage)
618         self.assertEquals(1,
619                           len(messages),
620                           "Did not receive the expected number of messages")
621
622     def test_ldap_simple_bind_unparseable_user(self):
623         def isLastExpectedMessage(msg):
624             return (msg["type"] == "Authentication" and
625                     msg["Authentication"]["serviceDescription"] == "LDAP" and
626                     (msg["Authentication"]["status"] ==
627                         "NT_STATUS_NO_SUCH_USER") and
628                     (msg["Authentication"]["authDescription"] ==
629                         "simple bind") and
630                     (msg["Authentication"]["eventId"] ==
631                         EVT_ID_UNSUCCESSFUL_LOGON) and
632                     (msg["Authentication"]["logonType"] ==
633                         EVT_LOGON_NETWORK_CLEAR_TEXT))
634
635         creds = self.insta_creds(template=self.get_credentials())
636         creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
637
638         thrown = False
639         try:
640             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
641                                lp=self.get_loadparm(),
642                                credentials=creds)
643         except LdbError:
644             thrown = True
645         self.assertEquals(thrown, True)
646
647         messages = self.waitForMessages(isLastExpectedMessage)
648         self.assertEquals(1,
649                           len(messages),
650                           "Did not receive the expected number of messages")
651
652     #
653     # Note: as this test does not expect any messages it will
654     #       time out in the call to self.waitForMessages.
655     #       This is expected, but it will slow this test.
656     def test_ldap_anonymous_access_bind_only(self):
657         # Should be no logging for anonymous bind
658         # so receiving any message indicates a failure.
659         def isLastExpectedMessage(msg):
660             return True
661
662         creds = self.insta_creds(template=self.get_credentials())
663         creds.set_anonymous()
664
665         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
666                            lp=self.get_loadparm(),
667                            credentials=creds)
668
669         messages = self.waitForMessages(isLastExpectedMessage)
670         self.assertEquals(0,
671                           len(messages),
672                           "Did not receive the expected number of messages")
673
674     def test_ldap_anonymous_access(self):
675         def isLastExpectedMessage(msg):
676             return (msg["type"] == "Authorization" and
677                     msg["Authorization"]["serviceDescription"]  == "LDAP" and
678                     msg["Authorization"]["transportProtection"] == "TLS" and
679                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
680                     msg["Authorization"]["authType"] == "no bind")
681
682         creds = self.insta_creds(template=self.get_credentials())
683         creds.set_anonymous()
684
685         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
686                            lp=self.get_loadparm(),
687                            credentials=creds)
688
689         try:
690             self.samdb.search(base=self.samdb.domain_dn())
691             self.fail("Expected an LdbError exception")
692         except LdbError:
693             pass
694
695         messages = self.waitForMessages(isLastExpectedMessage)
696         self.assertEquals(1,
697                           len(messages),
698                           "Did not receive the expected number of messages")
699
700     def test_smb(self):
701         def isLastExpectedMessage(msg):
702             return (msg["type"] == "Authorization" and
703                     msg["Authorization"]["serviceDescription"] == "SMB" and
704                     msg["Authorization"]["authType"] == "krb5" and
705                     msg["Authorization"]["transportProtection"] == "SMB")
706
707         creds = self.insta_creds(template=self.get_credentials())
708         smb.SMB(self.server,
709                 "sysvol",
710                 lp=self.get_loadparm(),
711                 creds=creds)
712
713         messages = self.waitForMessages(isLastExpectedMessage)
714         self.assertEquals(3,
715                           len(messages),
716                           "Did not receive the expected number of messages")
717         # Check the first message it should be an Authentication
718         msg = messages[0]
719         self.assertEquals("Authentication", msg["type"])
720         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
721         self.assertEquals("Kerberos KDC",
722                           msg["Authentication"]["serviceDescription"])
723         self.assertEquals("ENC-TS Pre-authentication",
724                           msg["Authentication"]["authDescription"])
725         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
726                           msg["Authentication"]["eventId"])
727         self.assertEquals(EVT_LOGON_NETWORK,
728                           msg["Authentication"]["logonType"])
729
730         # Check the second message it should be an Authentication
731         msg = messages[1]
732         self.assertEquals("Authentication", msg["type"])
733         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
734         self.assertEquals("Kerberos KDC",
735                           msg["Authentication"]["serviceDescription"])
736         self.assertEquals("ENC-TS Pre-authentication",
737                           msg["Authentication"]["authDescription"])
738         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
739                           msg["Authentication"]["eventId"])
740         self.assertEquals(EVT_LOGON_NETWORK,
741                           msg["Authentication"]["logonType"])
742
743     def test_smb_bad_password(self):
744         def isLastExpectedMessage(msg):
745             return (msg["type"] == "Authentication" and
746                     (msg["Authentication"]["serviceDescription"] ==
747                         "Kerberos KDC") and
748                     (msg["Authentication"]["status"] ==
749                         "NT_STATUS_WRONG_PASSWORD") and
750                     (msg["Authentication"]["authDescription"] ==
751                         "ENC-TS Pre-authentication"))
752
753         creds = self.insta_creds(template=self.get_credentials())
754         creds.set_password("badPassword")
755
756         thrown = False
757         try:
758             smb.SMB(self.server,
759                     "sysvol",
760                     lp=self.get_loadparm(),
761                     creds=creds)
762         except NTSTATUSError:
763             thrown = True
764         self.assertEquals(thrown, True)
765
766         messages = self.waitForMessages(isLastExpectedMessage)
767         self.assertEquals(1,
768                           len(messages),
769                           "Did not receive the expected number of messages")
770
771     def test_smb_bad_user(self):
772         def isLastExpectedMessage(msg):
773             return (msg["type"] == "Authentication" and
774                     (msg["Authentication"]["serviceDescription"] ==
775                         "Kerberos KDC") and
776                     (msg["Authentication"]["status"] ==
777                         "NT_STATUS_NO_SUCH_USER") and
778                     (msg["Authentication"]["authDescription"] ==
779                         "ENC-TS Pre-authentication") and
780                     (msg["Authentication"]["eventId"] ==
781                         EVT_ID_UNSUCCESSFUL_LOGON) and
782                     (msg["Authentication"]["logonType"] ==
783                         EVT_LOGON_NETWORK))
784
785         creds = self.insta_creds(template=self.get_credentials())
786         creds.set_username("badUser")
787
788         thrown = False
789         try:
790             smb.SMB(self.server,
791                     "sysvol",
792                     lp=self.get_loadparm(),
793                     creds=creds)
794         except NTSTATUSError:
795             thrown = True
796         self.assertEquals(thrown, True)
797
798         messages = self.waitForMessages(isLastExpectedMessage)
799         self.assertEquals(1,
800                           len(messages),
801                           "Did not receive the expected number of messages")
802
803     def test_smb1_anonymous(self):
804         def isLastExpectedMessage(msg):
805             return (msg["type"] == "Authorization" and
806                     msg["Authorization"]["serviceDescription"] == "SMB" and
807                     msg["Authorization"]["authType"] == "NTLMSSP" and
808                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
809                     msg["Authorization"]["transportProtection"] == "SMB")
810
811         server   = os.environ["SERVER"]
812
813         path = "//%s/IPC$" % server
814         auth = "-N"
815         call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
816
817         messages = self.waitForMessages(isLastExpectedMessage)
818         self.assertEquals(3,
819                           len(messages),
820                           "Did not receive the expected number of messages")
821
822         # Check the first message it should be an Authentication
823         msg = messages[0]
824         self.assertEquals("Authentication", msg["type"])
825         self.assertEquals("NT_STATUS_NO_SUCH_USER",
826                           msg["Authentication"]["status"])
827         self.assertEquals("SMB",
828                           msg["Authentication"]["serviceDescription"])
829         self.assertEquals("NTLMSSP",
830                           msg["Authentication"]["authDescription"])
831         self.assertEquals("No-Password",
832                           msg["Authentication"]["passwordType"])
833         self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
834                           msg["Authentication"]["eventId"])
835         self.assertEquals(EVT_LOGON_NETWORK,
836                           msg["Authentication"]["logonType"])
837
838         # Check the second message it should be an Authentication
839         msg = messages[1]
840         self.assertEquals("Authentication", msg["type"])
841         self.assertEquals("NT_STATUS_OK",
842                           msg["Authentication"]["status"])
843         self.assertEquals("SMB",
844                           msg["Authentication"]["serviceDescription"])
845         self.assertEquals("NTLMSSP",
846                           msg["Authentication"]["authDescription"])
847         self.assertEquals("No-Password",
848                           msg["Authentication"]["passwordType"])
849         self.assertEquals("ANONYMOUS LOGON",
850                           msg["Authentication"]["becameAccount"])
851         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
852                           msg["Authentication"]["eventId"])
853         self.assertEquals(EVT_LOGON_NETWORK,
854                           msg["Authentication"]["logonType"])
855
856     def test_smb2_anonymous(self):
857         def isLastExpectedMessage(msg):
858             return (msg["type"] == "Authorization" and
859                     msg["Authorization"]["serviceDescription"] == "SMB2" and
860                     msg["Authorization"]["authType"] == "NTLMSSP" and
861                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
862                     msg["Authorization"]["transportProtection"] == "SMB")
863
864         server   = os.environ["SERVER"]
865
866         path = "//%s/IPC$" % server
867         auth = "-N"
868         call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
869
870         messages = self.waitForMessages(isLastExpectedMessage)
871         self.assertEquals(3,
872                           len(messages),
873                           "Did not receive the expected number of messages")
874
875         # Check the first message it should be an Authentication
876         msg = messages[0]
877         self.assertEquals("Authentication", msg["type"])
878         self.assertEquals("NT_STATUS_NO_SUCH_USER",
879                           msg["Authentication"]["status"])
880         self.assertEquals("SMB2",
881                           msg["Authentication"]["serviceDescription"])
882         self.assertEquals("NTLMSSP",
883                           msg["Authentication"]["authDescription"])
884         self.assertEquals("No-Password",
885                           msg["Authentication"]["passwordType"])
886         self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
887                           msg["Authentication"]["eventId"])
888         self.assertEquals(EVT_LOGON_NETWORK,
889                           msg["Authentication"]["logonType"])
890
891         # Check the second message it should be an Authentication
892         msg = messages[1]
893         self.assertEquals("Authentication", msg["type"])
894         self.assertEquals("NT_STATUS_OK",
895                           msg["Authentication"]["status"])
896         self.assertEquals("SMB2",
897                           msg["Authentication"]["serviceDescription"])
898         self.assertEquals("NTLMSSP",
899                           msg["Authentication"]["authDescription"])
900         self.assertEquals("No-Password",
901                           msg["Authentication"]["passwordType"])
902         self.assertEquals("ANONYMOUS LOGON",
903                           msg["Authentication"]["becameAccount"])
904         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
905                           msg["Authentication"]["eventId"])
906         self.assertEquals(EVT_LOGON_NETWORK,
907                           msg["Authentication"]["logonType"])
908
909     def test_smb_no_krb_spnego(self):
910         def isLastExpectedMessage(msg):
911             return (msg["type"] == "Authorization" and
912                     msg["Authorization"]["serviceDescription"] == "SMB" and
913                     msg["Authorization"]["authType"] == "NTLMSSP" and
914                     msg["Authorization"]["transportProtection"] == "SMB")
915
916         creds = self.insta_creds(template=self.get_credentials(),
917                                  kerberos_state=DONT_USE_KERBEROS)
918         smb.SMB(self.server,
919                 "sysvol",
920                 lp=self.get_loadparm(),
921                 creds=creds)
922
923         messages = self.waitForMessages(isLastExpectedMessage)
924         self.assertEquals(2,
925                           len(messages),
926                           "Did not receive the expected number of messages")
927         # Check the first message it should be an Authentication
928         msg = messages[0]
929         self.assertEquals("Authentication", msg["type"])
930         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
931         self.assertEquals("SMB",
932                           msg["Authentication"]["serviceDescription"])
933         self.assertEquals("NTLMSSP",
934                           msg["Authentication"]["authDescription"])
935         self.assertEquals("NTLMv2",
936                           msg["Authentication"]["passwordType"])
937         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
938                           msg["Authentication"]["eventId"])
939         self.assertEquals(EVT_LOGON_NETWORK,
940                           msg["Authentication"]["logonType"])
941
942     def test_smb_no_krb_spnego_bad_password(self):
943         def isLastExpectedMessage(msg):
944             return (msg["type"] == "Authentication" and
945                     msg["Authentication"]["serviceDescription"] == "SMB" and
946                     msg["Authentication"]["authDescription"] == "NTLMSSP" and
947                     msg["Authentication"]["passwordType"] == "NTLMv2" and
948                     (msg["Authentication"]["status"] ==
949                         "NT_STATUS_WRONG_PASSWORD") and
950                     (msg["Authentication"]["eventId"] ==
951                         EVT_ID_UNSUCCESSFUL_LOGON) and
952                     (msg["Authentication"]["logonType"] ==
953                         EVT_LOGON_NETWORK))
954
955         creds = self.insta_creds(template=self.get_credentials(),
956                                  kerberos_state=DONT_USE_KERBEROS)
957         creds.set_password("badPassword")
958
959         thrown = False
960         try:
961             smb.SMB(self.server,
962                     "sysvol",
963                     lp=self.get_loadparm(),
964                     creds=creds)
965         except NTSTATUSError:
966             thrown = True
967         self.assertEquals(thrown, True)
968
969         messages = self.waitForMessages(isLastExpectedMessage)
970         self.assertEquals(1,
971                           len(messages),
972                           "Did not receive the expected number of messages")
973
974     def test_smb_no_krb_spnego_bad_user(self):
975         def isLastExpectedMessage(msg):
976             return (msg["type"] == "Authentication" and
977                     msg["Authentication"]["serviceDescription"] == "SMB" and
978                     msg["Authentication"]["authDescription"] == "NTLMSSP" and
979                     msg["Authentication"]["passwordType"] == "NTLMv2" and
980                     (msg["Authentication"]["status"] ==
981                         "NT_STATUS_NO_SUCH_USER") and
982                     (msg["Authentication"]["eventId"] ==
983                         EVT_ID_UNSUCCESSFUL_LOGON) and
984                     (msg["Authentication"]["logonType"] ==
985                         EVT_LOGON_NETWORK))
986
987         creds = self.insta_creds(template=self.get_credentials(),
988                                  kerberos_state=DONT_USE_KERBEROS)
989         creds.set_username("badUser")
990
991         thrown = False
992         try:
993             smb.SMB(self.server,
994                     "sysvol",
995                     lp=self.get_loadparm(),
996                     creds=creds)
997         except NTSTATUSError:
998             thrown = True
999         self.assertEquals(thrown, True)
1000
1001         messages = self.waitForMessages(isLastExpectedMessage)
1002         self.assertEquals(1,
1003                           len(messages),
1004                           "Did not receive the expected number of messages")
1005
1006     def test_smb_no_krb_no_spnego_no_ntlmv2(self):
1007         def isLastExpectedMessage(msg):
1008             return (msg["type"] == "Authorization" and
1009                     msg["Authorization"]["serviceDescription"] == "SMB" and
1010                     msg["Authorization"]["authType"] == "bare-NTLM" and
1011                     msg["Authorization"]["transportProtection"] == "SMB")
1012
1013         creds = self.insta_creds(template=self.get_credentials(),
1014                                  kerberos_state=DONT_USE_KERBEROS)
1015         smb.SMB(self.server,
1016                 "sysvol",
1017                 lp=self.get_loadparm(),
1018                 creds=creds,
1019                 ntlmv2_auth=False,
1020                 use_spnego=False)
1021
1022         messages = self.waitForMessages(isLastExpectedMessage)
1023         self.assertEquals(2,
1024                           len(messages),
1025                           "Did not receive the expected number of messages")
1026         # Check the first message it should be an Authentication
1027         msg = messages[0]
1028         self.assertEquals("Authentication", msg["type"])
1029         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
1030         self.assertEquals("SMB",
1031                           msg["Authentication"]["serviceDescription"])
1032         self.assertEquals("bare-NTLM",
1033                           msg["Authentication"]["authDescription"])
1034         self.assertEquals("NTLMv1",
1035                           msg["Authentication"]["passwordType"])
1036         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
1037                           msg["Authentication"]["eventId"])
1038         self.assertEquals(EVT_LOGON_NETWORK,
1039                           msg["Authentication"]["logonType"])
1040
1041     def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
1042         def isLastExpectedMessage(msg):
1043             return (msg["type"] == "Authentication" and
1044                     msg["Authentication"]["serviceDescription"] == "SMB" and
1045                     msg["Authentication"]["authDescription"] == "bare-NTLM" and
1046                     msg["Authentication"]["passwordType"] == "NTLMv1" and
1047                     (msg["Authentication"]["status"] ==
1048                         "NT_STATUS_WRONG_PASSWORD") and
1049                     (msg["Authentication"]["eventId"] ==
1050                         EVT_ID_UNSUCCESSFUL_LOGON) and
1051                     (msg["Authentication"]["logonType"] ==
1052                         EVT_LOGON_NETWORK))
1053
1054         creds = self.insta_creds(template=self.get_credentials(),
1055                                  kerberos_state=DONT_USE_KERBEROS)
1056         creds.set_password("badPassword")
1057
1058         thrown = False
1059         try:
1060             smb.SMB(self.server,
1061                     "sysvol",
1062                     lp=self.get_loadparm(),
1063                     creds=creds,
1064                     ntlmv2_auth=False,
1065                     use_spnego=False)
1066         except NTSTATUSError:
1067             thrown = True
1068         self.assertEquals(thrown, True)
1069
1070         messages = self.waitForMessages(isLastExpectedMessage)
1071         self.assertEquals(1,
1072                           len(messages),
1073                           "Did not receive the expected number of messages")
1074
1075     def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
1076         def isLastExpectedMessage(msg):
1077             return (msg["type"] == "Authentication" and
1078                     msg["Authentication"]["serviceDescription"] == "SMB" and
1079                     msg["Authentication"]["authDescription"] == "bare-NTLM" and
1080                     msg["Authentication"]["passwordType"] == "NTLMv1" and
1081                     (msg["Authentication"]["status"] ==
1082                         "NT_STATUS_NO_SUCH_USER") and
1083                     (msg["Authentication"]["eventId"] ==
1084                         EVT_ID_UNSUCCESSFUL_LOGON) and
1085                     (msg["Authentication"]["logonType"] ==
1086                         EVT_LOGON_NETWORK))
1087
1088         creds = self.insta_creds(template=self.get_credentials(),
1089                                  kerberos_state=DONT_USE_KERBEROS)
1090         creds.set_username("badUser")
1091
1092         thrown = False
1093         try:
1094             smb.SMB(self.server,
1095                     "sysvol",
1096                     lp=self.get_loadparm(),
1097                     creds=creds,
1098                     ntlmv2_auth=False,
1099                     use_spnego=False)
1100         except NTSTATUSError:
1101             thrown = True
1102         self.assertEquals(thrown, True)
1103
1104         messages = self.waitForMessages(isLastExpectedMessage)
1105         self.assertEquals(1,
1106                           len(messages),
1107                           "Did not receive the expected number of messages")
1108
1109     def test_samlogon_interactive(self):
1110
1111         workstation = "AuthLogTests"
1112
1113         def isLastExpectedMessage(msg):
1114             return (msg["type"] == "Authentication" and
1115                     (msg["Authentication"]["serviceDescription"] ==
1116                         "SamLogon") and
1117                     (msg["Authentication"]["authDescription"] ==
1118                         "interactive") and
1119                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1120                     (msg["Authentication"]["workstation"] ==
1121                         r"\\%s" % workstation) and
1122                     (msg["Authentication"]["eventId"] ==
1123                         EVT_ID_SUCCESSFUL_LOGON) and
1124                     (msg["Authentication"]["logonType"] ==
1125                         EVT_LOGON_INTERACTIVE))
1126
1127         server   = os.environ["SERVER"]
1128         user     = os.environ["USERNAME"]
1129         password = os.environ["PASSWORD"]
1130         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1131
1132         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1133
1134         messages = self.waitForMessages(isLastExpectedMessage)
1135         messages = self.remove_netlogon_messages(messages)
1136         received = len(messages)
1137         self.assertIs(True,
1138                       (received == 5 or received == 6),
1139                       "Did not receive the expected number of messages")
1140
1141     def test_samlogon_interactive_bad_password(self):
1142
1143         workstation = "AuthLogTests"
1144
1145         def isLastExpectedMessage(msg):
1146             return (msg["type"] == "Authentication" and
1147                     (msg["Authentication"]["serviceDescription"] ==
1148                         "SamLogon") and
1149                     (msg["Authentication"]["authDescription"] ==
1150                         "interactive") and
1151                     (msg["Authentication"]["status"] ==
1152                         "NT_STATUS_WRONG_PASSWORD") and
1153                     (msg["Authentication"]["workstation"] ==
1154                         r"\\%s" % workstation) and
1155                     (msg["Authentication"]["eventId"] ==
1156                         EVT_ID_UNSUCCESSFUL_LOGON) and
1157                     (msg["Authentication"]["logonType"] ==
1158                         EVT_LOGON_INTERACTIVE))
1159
1160         server   = os.environ["SERVER"]
1161         user     = os.environ["USERNAME"]
1162         password = "badPassword"
1163         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1164
1165         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1166
1167         messages = self.waitForMessages(isLastExpectedMessage)
1168         messages = self.remove_netlogon_messages(messages)
1169         received = len(messages)
1170         self.assertIs(True,
1171                       (received == 5 or received == 6),
1172                       "Did not receive the expected number of messages")
1173
1174     def test_samlogon_interactive_bad_user(self):
1175
1176         workstation = "AuthLogTests"
1177
1178         def isLastExpectedMessage(msg):
1179             return (msg["type"] == "Authentication" and
1180                     (msg["Authentication"]["serviceDescription"] ==
1181                         "SamLogon") and
1182                     (msg["Authentication"]["authDescription"] ==
1183                         "interactive") and
1184                     (msg["Authentication"]["status"] ==
1185                         "NT_STATUS_NO_SUCH_USER") and
1186                     (msg["Authentication"]["workstation"] ==
1187                         r"\\%s" % workstation) and
1188                     (msg["Authentication"]["eventId"] ==
1189                         EVT_ID_UNSUCCESSFUL_LOGON) and
1190                     (msg["Authentication"]["logonType"] ==
1191                         EVT_LOGON_INTERACTIVE))
1192
1193         server   = os.environ["SERVER"]
1194         user     = "badUser"
1195         password = os.environ["PASSWORD"]
1196         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1197
1198         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1199
1200         messages = self.waitForMessages(isLastExpectedMessage)
1201         messages = self.remove_netlogon_messages(messages)
1202         received = len(messages)
1203         self.assertIs(True,
1204                       (received == 5 or received == 6),
1205                       "Did not receive the expected number of messages")
1206
1207     def test_samlogon_network(self):
1208
1209         workstation = "AuthLogTests"
1210
1211         def isLastExpectedMessage(msg):
1212             return (msg["type"] == "Authentication" and
1213                     (msg["Authentication"]["serviceDescription"] ==
1214                         "SamLogon") and
1215                     msg["Authentication"]["authDescription"] == "network" and
1216                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1217                     (msg["Authentication"]["workstation"] ==
1218                         r"\\%s" % workstation) and
1219                     (msg["Authentication"]["eventId"] ==
1220                         EVT_ID_SUCCESSFUL_LOGON) and
1221                     (msg["Authentication"]["logonType"] ==
1222                         EVT_LOGON_NETWORK))
1223
1224         server   = os.environ["SERVER"]
1225         user     = os.environ["USERNAME"]
1226         password = os.environ["PASSWORD"]
1227         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1228
1229         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1230
1231         messages = self.waitForMessages(isLastExpectedMessage)
1232         messages = self.remove_netlogon_messages(messages)
1233         received = len(messages)
1234         self.assertIs(True,
1235                       (received == 5 or received == 6),
1236                       "Did not receive the expected number of messages")
1237
1238     def test_samlogon_network_bad_password(self):
1239
1240         workstation = "AuthLogTests"
1241
1242         def isLastExpectedMessage(msg):
1243             return (msg["type"] == "Authentication" and
1244                     (msg["Authentication"]["serviceDescription"] ==
1245                         "SamLogon") and
1246                     msg["Authentication"]["authDescription"] == "network" and
1247                     (msg["Authentication"]["status"] ==
1248                         "NT_STATUS_WRONG_PASSWORD") and
1249                     (msg["Authentication"]["workstation"] ==
1250                         r"\\%s" % workstation) and
1251                     (msg["Authentication"]["eventId"] ==
1252                         EVT_ID_UNSUCCESSFUL_LOGON) and
1253                     (msg["Authentication"]["logonType"] ==
1254                         EVT_LOGON_NETWORK))
1255
1256         server   = os.environ["SERVER"]
1257         user     = os.environ["USERNAME"]
1258         password = "badPassword"
1259         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1260
1261         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1262
1263         messages = self.waitForMessages(isLastExpectedMessage)
1264         messages = self.remove_netlogon_messages(messages)
1265         received = len(messages)
1266         self.assertIs(True,
1267                       (received == 5 or received == 6),
1268                       "Did not receive the expected number of messages")
1269
1270     def test_samlogon_network_bad_user(self):
1271
1272         workstation = "AuthLogTests"
1273
1274         def isLastExpectedMessage(msg):
1275             return ((msg["type"] == "Authentication") and
1276                     (msg["Authentication"]["serviceDescription"] ==
1277                         "SamLogon") and
1278                     (msg["Authentication"]["authDescription"] == "network") and
1279                     (msg["Authentication"]["status"] ==
1280                         "NT_STATUS_NO_SUCH_USER") and
1281                     (msg["Authentication"]["workstation"] ==
1282                         r"\\%s" % workstation) and
1283                     (msg["Authentication"]["eventId"] ==
1284                         EVT_ID_UNSUCCESSFUL_LOGON) and
1285                     (msg["Authentication"]["logonType"] ==
1286                         EVT_LOGON_NETWORK))
1287
1288         server   = os.environ["SERVER"]
1289         user     = "badUser"
1290         password = os.environ["PASSWORD"]
1291         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1292
1293         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1294
1295         messages = self.waitForMessages(isLastExpectedMessage)
1296         messages = self.remove_netlogon_messages(messages)
1297         received = len(messages)
1298         self.assertIs(True,
1299                       (received == 5 or received == 6),
1300                       "Did not receive the expected number of messages")
1301
1302     def test_samlogon_network_mschap(self):
1303
1304         workstation = "AuthLogTests"
1305
1306         def isLastExpectedMessage(msg):
1307             return ((msg["type"] == "Authentication") and
1308                     (msg["Authentication"]["serviceDescription"] ==
1309                         "SamLogon") and
1310                     (msg["Authentication"]["authDescription"] == "network") and
1311                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1312                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1313                     (msg["Authentication"]["workstation"] ==
1314                         r"\\%s" % workstation) and
1315                     (msg["Authentication"]["eventId"] ==
1316                         EVT_ID_SUCCESSFUL_LOGON) and
1317                     (msg["Authentication"]["logonType"] ==
1318                         EVT_LOGON_NETWORK))
1319
1320         server   = os.environ["SERVER"]
1321         user     = os.environ["USERNAME"]
1322         password = os.environ["PASSWORD"]
1323         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1324             user, password, workstation, 2)
1325
1326         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1327
1328         messages = self.waitForMessages(isLastExpectedMessage)
1329         messages = self.remove_netlogon_messages(messages)
1330         received = len(messages)
1331         self.assertIs(True,
1332                       (received == 5 or received == 6),
1333                       "Did not receive the expected number of messages")
1334
1335     def test_samlogon_network_mschap_bad_password(self):
1336
1337         workstation = "AuthLogTests"
1338
1339         def isLastExpectedMessage(msg):
1340             return ((msg["type"] == "Authentication") and
1341                     (msg["Authentication"]["serviceDescription"] ==
1342                         "SamLogon") and
1343                     (msg["Authentication"]["authDescription"] == "network") and
1344                     (msg["Authentication"]["status"] ==
1345                         "NT_STATUS_WRONG_PASSWORD") and
1346                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1347                     (msg["Authentication"]["workstation"] ==
1348                         r"\\%s" % workstation) and
1349                     (msg["Authentication"]["eventId"] ==
1350                         EVT_ID_UNSUCCESSFUL_LOGON) and
1351                     (msg["Authentication"]["logonType"] ==
1352                         EVT_LOGON_NETWORK))
1353
1354         server   = os.environ["SERVER"]
1355         user     = os.environ["USERNAME"]
1356         password = "badPassword"
1357         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1358             user, password, workstation, 2)
1359
1360         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1361
1362         messages = self.waitForMessages(isLastExpectedMessage)
1363         messages = self.remove_netlogon_messages(messages)
1364         received = len(messages)
1365         self.assertIs(True,
1366                       (received == 5 or received == 6),
1367                       "Did not receive the expected number of messages")
1368
1369     def test_samlogon_network_mschap_bad_user(self):
1370
1371         workstation = "AuthLogTests"
1372
1373         def isLastExpectedMessage(msg):
1374             return ((msg["type"] == "Authentication") and
1375                     (msg["Authentication"]["serviceDescription"] ==
1376                         "SamLogon") and
1377                     (msg["Authentication"]["authDescription"] == "network") and
1378                     (msg["Authentication"]["status"] ==
1379                         "NT_STATUS_NO_SUCH_USER") and
1380                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1381                     (msg["Authentication"]["workstation"] ==
1382                         r"\\%s" % workstation) and
1383                     (msg["Authentication"]["eventId"] ==
1384                         EVT_ID_UNSUCCESSFUL_LOGON) and
1385                     (msg["Authentication"]["logonType"] ==
1386                         EVT_LOGON_NETWORK))
1387
1388         server   = os.environ["SERVER"]
1389         user     = "badUser"
1390         password = os.environ["PASSWORD"]
1391         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1392             user, password, workstation, 2)
1393
1394         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1395
1396         messages = self.waitForMessages(isLastExpectedMessage)
1397         messages = self.remove_netlogon_messages(messages)
1398         received = len(messages)
1399         self.assertIs(True,
1400                       (received == 5 or received == 6),
1401                       "Did not receive the expected number of messages")
1402
1403     def test_samlogon_schannel_seal(self):
1404
1405         workstation = "AuthLogTests"
1406
1407         def isLastExpectedMessage(msg):
1408             return ((msg["type"] == "Authentication") and
1409                     (msg["Authentication"]["serviceDescription"] ==
1410                         "SamLogon") and
1411                     (msg["Authentication"]["authDescription"] == "network") and
1412                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1413                     (msg["Authentication"]["workstation"] ==
1414                         r"\\%s" % workstation) and
1415                     (msg["Authentication"]["eventId"] ==
1416                         EVT_ID_SUCCESSFUL_LOGON) and
1417                     (msg["Authentication"]["logonType"] ==
1418                         EVT_LOGON_NETWORK))
1419
1420         server   = os.environ["SERVER"]
1421         user     = os.environ["USERNAME"]
1422         password = os.environ["PASSWORD"]
1423         samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1424
1425         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1426
1427         messages = self.waitForMessages(isLastExpectedMessage)
1428         messages = self.remove_netlogon_messages(messages)
1429         received = len(messages)
1430         self.assertIs(True,
1431                       (received == 5 or received == 6),
1432                       "Did not receive the expected number of messages")
1433
1434         # Check the second to last message it should be an Authorization
1435         msg = messages[-2]
1436         self.assertEquals("Authorization", msg["type"])
1437         self.assertEquals("DCE/RPC",
1438                           msg["Authorization"]["serviceDescription"])
1439         self.assertEquals("schannel", msg["Authorization"]["authType"])
1440         self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1441         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
1442
1443     # Signed logons get promoted to sealed, this test ensures that
1444     # this behaviour is not removed accidentally
1445     def test_samlogon_schannel_sign(self):
1446
1447         workstation = "AuthLogTests"
1448
1449         def isLastExpectedMessage(msg):
1450             return ((msg["type"] == "Authentication") and
1451                     (msg["Authentication"]["serviceDescription"] ==
1452                         "SamLogon") and
1453                     (msg["Authentication"]["authDescription"] == "network") and
1454                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1455                     (msg["Authentication"]["workstation"] ==
1456                         r"\\%s" % workstation) and
1457                     (msg["Authentication"]["eventId"] ==
1458                         EVT_ID_SUCCESSFUL_LOGON) and
1459                     (msg["Authentication"]["logonType"] ==
1460                         EVT_LOGON_NETWORK))
1461
1462         server   = os.environ["SERVER"]
1463         user     = os.environ["USERNAME"]
1464         password = os.environ["PASSWORD"]
1465         samlogon = "schannelsign;samlogon %s %s %s" % (
1466             user, password, workstation)
1467
1468         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1469
1470         messages = self.waitForMessages(isLastExpectedMessage)
1471         messages = self.remove_netlogon_messages(messages)
1472         received = len(messages)
1473         self.assertIs(True,
1474                       (received == 5 or received == 6),
1475                       "Did not receive the expected number of messages")
1476
1477         # Check the second to last message it should be an Authorization
1478         msg = messages[-2]
1479         self.assertEquals("Authorization", msg["type"])
1480         self.assertEquals("DCE/RPC",
1481                           msg["Authorization"]["serviceDescription"])
1482         self.assertEquals("schannel", msg["Authorization"]["authType"])
1483         self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1484         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))