traffic: new version of model with packet_rate, version number
[samba.git] / python / samba / tests / auth_log.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging.
20 """
21 import samba.tests
22 from samba.dcerpc import srvsvc, dnsserver
23 import os
24 from samba import smb
25 from samba.samdb import SamDB
26 import samba.tests.auth_log_base
27 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
28 from samba import NTSTATUSError
29 from subprocess import call
30 from ldb import LdbError
31 from samba.dcerpc.windows_event_ids import (
32     EVT_ID_SUCCESSFUL_LOGON,
33     EVT_ID_UNSUCCESSFUL_LOGON,
34     EVT_LOGON_NETWORK,
35     EVT_LOGON_INTERACTIVE,
36     EVT_LOGON_NETWORK_CLEAR_TEXT
37 )
38 import re
39
40
41 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
42
43     def setUp(self):
44         super(AuthLogTests, self).setUp()
45         self.remoteAddress = os.environ["CLIENT_IP"]
46
47     def tearDown(self):
48         super(AuthLogTests, self).tearDown()
49
50     def _test_rpc_ncacn_np(self, authTypes, creds, service,
51                            binding, protection, checkFunction):
52         def isLastExpectedMessage(msg):
53             return (msg["type"] == "Authorization" and
54                     (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
55                      msg["Authorization"]["serviceDescription"] == service) and
56                     msg["Authorization"]["authType"] == authTypes[0] and
57                     msg["Authorization"]["transportProtection"] == protection)
58
59         if binding:
60             binding = "[%s]" % binding
61
62         if service == "dnsserver":
63             x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
64                                     self.get_loadparm(),
65                                     creds)
66         elif service == "srvsvc":
67             x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
68                               self.get_loadparm(),
69                               creds)
70
71         # The connection is passed to ensure the server
72         # messaging context stays up until all the messages have been received.
73         messages = self.waitForMessages(isLastExpectedMessage, x)
74         checkFunction(messages, authTypes, service, binding, protection)
75
76     def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
77         # Turn "[foo,bar]" into a list ("foo", "bar") to test
78         # lambda x: x removes anything that evaluates to False,
79         # including empty strings, so we handle "" as well
80         binding_list = \
81             list(filter(lambda x: x, re.compile('[\[,\]]').split(binding)))
82
83         # Handle explicit smb2, smb1 or auto negotiation
84         if "smb2" in binding_list:
85             self.assertEquals(serviceDescription, "SMB2")
86         elif "smb1" in binding_list:
87             self.assertEquals(serviceDescription, "SMB")
88         else:
89             self.assertIn(serviceDescription, ["SMB", "SMB2"])
90
91     def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
92                                 binding, protection):
93
94         expected_messages = len(authTypes)
95         self.assertEquals(expected_messages,
96                           len(messages),
97                           "Did not receive the expected number of messages")
98
99         # Check the first message it should be an Authentication
100         msg = messages[0]
101         self.assertEquals("Authentication", msg["type"])
102         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
103         self.assertEquals(
104             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
105         self.assertEquals(
106             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
107         self._assert_ncacn_np_serviceDescription(
108             binding, msg["Authentication"]["serviceDescription"])
109         self.assertEquals(authTypes[1],
110                           msg["Authentication"]["authDescription"])
111
112         # Check the second message it should be an Authorization
113         msg = messages[1]
114         self.assertEquals("Authorization", msg["type"])
115         self._assert_ncacn_np_serviceDescription(
116             binding, msg["Authorization"]["serviceDescription"])
117         self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
118         self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
119         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
120
121         # Check the third message it should be an Authentication
122         # if we are expecting 4 messages
123         if expected_messages == 4:
124             def checkServiceDescription(desc):
125                 return (desc == "DCE/RPC" or desc == service)
126
127             msg = messages[2]
128             self.assertEquals("Authentication", msg["type"])
129             self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
130             self.assertTrue(
131                 checkServiceDescription(
132                     msg["Authentication"]["serviceDescription"]))
133
134             self.assertEquals(authTypes[3],
135                               msg["Authentication"]["authDescription"])
136             self.assertEquals(
137                 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
138             self.assertEquals(
139                 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
140
141     def rpc_ncacn_np_krb5_check(
142             self,
143             messages,
144             authTypes,
145             service,
146             binding,
147             protection):
148
149         expected_messages = len(authTypes)
150         self.assertEquals(expected_messages,
151                           len(messages),
152                           "Did not receive the expected number of messages")
153
154         # Check the first message it should be an Authentication
155         # This is almost certainly Authentication over UDP, and is probably
156         # returning message too big,
157         msg = messages[0]
158         self.assertEquals("Authentication", msg["type"])
159         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
160         self.assertEquals("Kerberos KDC",
161                           msg["Authentication"]["serviceDescription"])
162         self.assertEquals(authTypes[1],
163                           msg["Authentication"]["authDescription"])
164         self.assertEquals(
165             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
166         self.assertEquals(
167             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
168
169         # Check the second message it should be an Authentication
170         # This this the TCP Authentication in response to the message too big
171         # response to the UDP Authentication
172         msg = messages[1]
173         self.assertEquals("Authentication", msg["type"])
174         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
175         self.assertEquals("Kerberos KDC",
176                           msg["Authentication"]["serviceDescription"])
177         self.assertEquals(authTypes[2],
178                           msg["Authentication"]["authDescription"])
179         self.assertEquals(
180             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
181         self.assertEquals(
182             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
183
184         # Check the third message it should be an Authorization
185         msg = messages[2]
186         self.assertEquals("Authorization", msg["type"])
187         self._assert_ncacn_np_serviceDescription(
188             binding, msg["Authorization"]["serviceDescription"])
189         self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
190         self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
191         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
192
193     def test_rpc_ncacn_np_ntlm_dns_sign(self):
194         creds = self.insta_creds(template=self.get_credentials(),
195                                  kerberos_state=DONT_USE_KERBEROS)
196         self._test_rpc_ncacn_np(["NTLMSSP",
197                                  "NTLMSSP",
198                                  "NTLMSSP",
199                                  "NTLMSSP"],
200                                 creds, "dnsserver", "sign", "SIGN",
201                                 self.rpc_ncacn_np_ntlm_check)
202
203     def test_rpc_ncacn_np_ntlm_srv_sign(self):
204         creds = self.insta_creds(template=self.get_credentials(),
205                                  kerberos_state=DONT_USE_KERBEROS)
206         self._test_rpc_ncacn_np(["NTLMSSP",
207                                  "NTLMSSP",
208                                  "NTLMSSP",
209                                  "NTLMSSP"],
210                                 creds, "srvsvc", "sign", "SIGN",
211                                 self.rpc_ncacn_np_ntlm_check)
212
213     def test_rpc_ncacn_np_ntlm_dns(self):
214         creds = self.insta_creds(template=self.get_credentials(),
215                                  kerberos_state=DONT_USE_KERBEROS)
216         self._test_rpc_ncacn_np(["ncacn_np",
217                                  "NTLMSSP",
218                                  "NTLMSSP"],
219                                 creds, "dnsserver", "", "SMB",
220                                 self.rpc_ncacn_np_ntlm_check)
221
222     def test_rpc_ncacn_np_ntlm_srv(self):
223         creds = self.insta_creds(template=self.get_credentials(),
224                                  kerberos_state=DONT_USE_KERBEROS)
225         self._test_rpc_ncacn_np(["ncacn_np",
226                                  "NTLMSSP",
227                                  "NTLMSSP"],
228                                 creds, "srvsvc", "", "SMB",
229                                 self.rpc_ncacn_np_ntlm_check)
230
231     def test_rpc_ncacn_np_krb_dns_sign(self):
232         creds = self.insta_creds(template=self.get_credentials(),
233                                  kerberos_state=MUST_USE_KERBEROS)
234         self._test_rpc_ncacn_np(["krb5",
235                                  "ENC-TS Pre-authentication",
236                                  "ENC-TS Pre-authentication",
237                                  "krb5"],
238                                 creds, "dnsserver", "sign", "SIGN",
239                                 self.rpc_ncacn_np_krb5_check)
240
241     def test_rpc_ncacn_np_krb_srv_sign(self):
242         creds = self.insta_creds(template=self.get_credentials(),
243                                  kerberos_state=MUST_USE_KERBEROS)
244         self._test_rpc_ncacn_np(["krb5",
245                                  "ENC-TS Pre-authentication",
246                                  "ENC-TS Pre-authentication",
247                                  "krb5"],
248                                 creds, "srvsvc", "sign", "SIGN",
249                                 self.rpc_ncacn_np_krb5_check)
250
251     def test_rpc_ncacn_np_krb_dns(self):
252         creds = self.insta_creds(template=self.get_credentials(),
253                                  kerberos_state=MUST_USE_KERBEROS)
254         self._test_rpc_ncacn_np(["ncacn_np",
255                                  "ENC-TS Pre-authentication",
256                                  "ENC-TS Pre-authentication",
257                                  "krb5"],
258                                 creds, "dnsserver", "", "SMB",
259                                 self.rpc_ncacn_np_krb5_check)
260
261     def test_rpc_ncacn_np_krb_dns_smb2(self):
262         creds = self.insta_creds(template=self.get_credentials(),
263                                  kerberos_state=MUST_USE_KERBEROS)
264         self._test_rpc_ncacn_np(["ncacn_np",
265                                  "ENC-TS Pre-authentication",
266                                  "ENC-TS Pre-authentication",
267                                  "krb5"],
268                                 creds, "dnsserver", "smb2", "SMB",
269                                 self.rpc_ncacn_np_krb5_check)
270
271     def test_rpc_ncacn_np_krb_srv(self):
272         creds = self.insta_creds(template=self.get_credentials(),
273                                  kerberos_state=MUST_USE_KERBEROS)
274         self._test_rpc_ncacn_np(["ncacn_np",
275                                  "ENC-TS Pre-authentication",
276                                  "ENC-TS Pre-authentication",
277                                  "krb5"],
278                                 creds, "srvsvc", "", "SMB",
279                                 self.rpc_ncacn_np_krb5_check)
280
281     def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
282                                binding, protection, checkFunction):
283         def isLastExpectedMessage(msg):
284             return (msg["type"] == "Authorization" and
285                     msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
286                     msg["Authorization"]["authType"] == authTypes[0] and
287                     msg["Authorization"]["transportProtection"] == protection)
288
289         if binding:
290             binding = "[%s]" % binding
291
292         if service == "dnsserver":
293             conn = dnsserver.dnsserver(
294                 "ncacn_ip_tcp:%s%s" % (self.server, binding),
295                 self.get_loadparm(),
296                 creds)
297         elif service == "srvsvc":
298             conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
299                                  self.get_loadparm(),
300                                  creds)
301
302         messages = self.waitForMessages(isLastExpectedMessage, conn)
303         checkFunction(messages, authTypes, service, binding, protection)
304
305     def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
306                                     binding, protection):
307
308         expected_messages = len(authTypes)
309         self.assertEquals(expected_messages,
310                           len(messages),
311                           "Did not receive the expected number of messages")
312
313         # Check the first message it should be an Authorization
314         msg = messages[0]
315         self.assertEquals("Authorization", msg["type"])
316         self.assertEquals("DCE/RPC",
317                           msg["Authorization"]["serviceDescription"])
318         self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
319         self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
320         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
321
322         # Check the second message it should be an Authentication
323         msg = messages[1]
324         self.assertEquals("Authentication", msg["type"])
325         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
326         self.assertEquals("DCE/RPC",
327                           msg["Authentication"]["serviceDescription"])
328         self.assertEquals(authTypes[2],
329                           msg["Authentication"]["authDescription"])
330         self.assertEquals(
331             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
332         self.assertEquals(
333             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
334
335     def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
336                                     binding, protection):
337
338         expected_messages = len(authTypes)
339         self.assertEquals(expected_messages,
340                           len(messages),
341                           "Did not receive the expected number of messages")
342
343         # Check the first message it should be an Authorization
344         msg = messages[0]
345         self.assertEquals("Authorization", msg["type"])
346         self.assertEquals("DCE/RPC",
347                           msg["Authorization"]["serviceDescription"])
348         self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
349         self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
350         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
351
352         # Check the second message it should be an Authentication
353         msg = messages[1]
354         self.assertEquals("Authentication", msg["type"])
355         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
356         self.assertEquals("Kerberos KDC",
357                           msg["Authentication"]["serviceDescription"])
358         self.assertEquals(authTypes[2],
359                           msg["Authentication"]["authDescription"])
360         self.assertEquals(
361             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
362         self.assertEquals(
363             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
364
365         # Check the third message it should be an Authentication
366         msg = messages[2]
367         self.assertEquals("Authentication", msg["type"])
368         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
369         self.assertEquals("Kerberos KDC",
370                           msg["Authentication"]["serviceDescription"])
371         self.assertEquals(authTypes[2],
372                           msg["Authentication"]["authDescription"])
373         self.assertEquals(
374             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
375         self.assertEquals(
376             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
377
378     def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
379         creds = self.insta_creds(template=self.get_credentials(),
380                                  kerberos_state=DONT_USE_KERBEROS)
381         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
382                                      "ncacn_ip_tcp",
383                                      "NTLMSSP"],
384                                     creds, "dnsserver", "sign", "SIGN",
385                                     self.rpc_ncacn_ip_tcp_ntlm_check)
386
387     def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
388         creds = self.insta_creds(template=self.get_credentials(),
389                                  kerberos_state=MUST_USE_KERBEROS)
390         self._test_rpc_ncacn_ip_tcp(["krb5",
391                                      "ncacn_ip_tcp",
392                                      "ENC-TS Pre-authentication",
393                                      "ENC-TS Pre-authentication"],
394                                     creds, "dnsserver", "sign", "SIGN",
395                                     self.rpc_ncacn_ip_tcp_krb5_check)
396
397     def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
398         creds = self.insta_creds(template=self.get_credentials(),
399                                  kerberos_state=DONT_USE_KERBEROS)
400         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
401                                      "ncacn_ip_tcp",
402                                      "NTLMSSP"],
403                                     creds, "dnsserver", "", "SIGN",
404                                     self.rpc_ncacn_ip_tcp_ntlm_check)
405
406     def test_rpc_ncacn_ip_tcp_krb5_dns(self):
407         creds = self.insta_creds(template=self.get_credentials(),
408                                  kerberos_state=MUST_USE_KERBEROS)
409         self._test_rpc_ncacn_ip_tcp(["krb5",
410                                      "ncacn_ip_tcp",
411                                      "ENC-TS Pre-authentication",
412                                      "ENC-TS Pre-authentication"],
413                                     creds, "dnsserver", "", "SIGN",
414                                     self.rpc_ncacn_ip_tcp_krb5_check)
415
416     def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
417         creds = self.insta_creds(template=self.get_credentials(),
418                                  kerberos_state=DONT_USE_KERBEROS)
419         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
420                                      "ncacn_ip_tcp",
421                                      "NTLMSSP"],
422                                     creds, "dnsserver", "connect", "NONE",
423                                     self.rpc_ncacn_ip_tcp_ntlm_check)
424
425     def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
426         creds = self.insta_creds(template=self.get_credentials(),
427                                  kerberos_state=MUST_USE_KERBEROS)
428         self._test_rpc_ncacn_ip_tcp(["krb5",
429                                      "ncacn_ip_tcp",
430                                      "ENC-TS Pre-authentication",
431                                      "ENC-TS Pre-authentication"],
432                                     creds, "dnsserver", "connect", "NONE",
433                                     self.rpc_ncacn_ip_tcp_krb5_check)
434
435     def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
436         creds = self.insta_creds(template=self.get_credentials(),
437                                  kerberos_state=DONT_USE_KERBEROS)
438         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
439                                      "ncacn_ip_tcp",
440                                      "NTLMSSP"],
441                                     creds, "dnsserver", "seal", "SEAL",
442                                     self.rpc_ncacn_ip_tcp_ntlm_check)
443
444     def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
445         creds = self.insta_creds(template=self.get_credentials(),
446                                  kerberos_state=MUST_USE_KERBEROS)
447         self._test_rpc_ncacn_ip_tcp(["krb5",
448                                      "ncacn_ip_tcp",
449                                      "ENC-TS Pre-authentication",
450                                      "ENC-TS Pre-authentication"],
451                                     creds, "dnsserver", "seal", "SEAL",
452                                     self.rpc_ncacn_ip_tcp_krb5_check)
453
454     def test_ldap(self):
455
456         def isLastExpectedMessage(msg):
457             return (msg["type"] == "Authorization" and
458                     msg["Authorization"]["serviceDescription"] == "LDAP" and
459                     msg["Authorization"]["transportProtection"] == "SIGN" and
460                     msg["Authorization"]["authType"] == "krb5")
461
462         self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
463                            lp=self.get_loadparm(),
464                            credentials=self.get_credentials())
465
466         messages = self.waitForMessages(isLastExpectedMessage)
467         self.assertEquals(3,
468                           len(messages),
469                           "Did not receive the expected number of messages")
470
471         # Check the first message it should be an Authentication
472         msg = messages[0]
473         self.assertEquals("Authentication", msg["type"])
474         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
475         self.assertEquals("Kerberos KDC",
476                           msg["Authentication"]["serviceDescription"])
477         self.assertEquals("ENC-TS Pre-authentication",
478                           msg["Authentication"]["authDescription"])
479         self.assertTrue(msg["Authentication"]["duration"] > 0)
480         self.assertEquals(
481             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
482         self.assertEquals(
483             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
484
485         # Check the second message it should be an Authentication
486         msg = messages[1]
487         self.assertEquals("Authentication", msg["type"])
488         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
489         self.assertEquals("Kerberos KDC",
490                           msg["Authentication"]["serviceDescription"])
491         self.assertEquals("ENC-TS Pre-authentication",
492                           msg["Authentication"]["authDescription"])
493         self.assertTrue(msg["Authentication"]["duration"] > 0)
494         self.assertEquals(
495             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
496         self.assertEquals(
497             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
498
499     def test_ldap_ntlm(self):
500
501         def isLastExpectedMessage(msg):
502             return (msg["type"] == "Authorization" and
503                     msg["Authorization"]["serviceDescription"] == "LDAP" and
504                     msg["Authorization"]["transportProtection"] == "SEAL" and
505                     msg["Authorization"]["authType"] == "NTLMSSP")
506
507         self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
508                            lp=self.get_loadparm(),
509                            credentials=self.get_credentials())
510
511         messages = self.waitForMessages(isLastExpectedMessage)
512         self.assertEquals(2,
513                           len(messages),
514                           "Did not receive the expected number of messages")
515         # Check the first message it should be an Authentication
516         msg = messages[0]
517         self.assertEquals("Authentication", msg["type"])
518         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
519         self.assertEquals("LDAP",
520                           msg["Authentication"]["serviceDescription"])
521         self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
522         self.assertTrue(msg["Authentication"]["duration"] > 0)
523         self.assertEquals(
524             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
525         self.assertEquals(
526             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
527
528     def test_ldap_simple_bind(self):
529         def isLastExpectedMessage(msg):
530             return (msg["type"] == "Authorization" and
531                     msg["Authorization"]["serviceDescription"] == "LDAP" and
532                     msg["Authorization"]["transportProtection"] == "TLS" and
533                     msg["Authorization"]["authType"] == "simple bind")
534
535         creds = self.insta_creds(template=self.get_credentials())
536         creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
537                                       creds.get_username()))
538
539         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
540                            lp=self.get_loadparm(),
541                            credentials=creds)
542
543         messages = self.waitForMessages(isLastExpectedMessage)
544         self.assertEquals(2,
545                           len(messages),
546                           "Did not receive the expected number of messages")
547
548         # Check the first message it should be an Authentication
549         msg = messages[0]
550         self.assertEquals("Authentication", msg["type"])
551         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
552         self.assertEquals("LDAP",
553                           msg["Authentication"]["serviceDescription"])
554         self.assertEquals("simple bind",
555                           msg["Authentication"]["authDescription"])
556         self.assertEquals(
557             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
558         self.assertEquals(
559             EVT_LOGON_NETWORK_CLEAR_TEXT, msg["Authentication"]["logonType"])
560
561     def test_ldap_simple_bind_bad_password(self):
562         def isLastExpectedMessage(msg):
563             return (msg["type"] == "Authentication" and
564                     msg["Authentication"]["serviceDescription"] == "LDAP" and
565                     (msg["Authentication"]["status"] ==
566                         "NT_STATUS_WRONG_PASSWORD") and
567                     (msg["Authentication"]["authDescription"] ==
568                         "simple bind") and
569                     (msg["Authentication"]["eventId"] ==
570                         EVT_ID_UNSUCCESSFUL_LOGON) and
571                     (msg["Authentication"]["logonType"] ==
572                         EVT_LOGON_NETWORK_CLEAR_TEXT))
573
574         creds = self.insta_creds(template=self.get_credentials())
575         creds.set_password("badPassword")
576         creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
577                                       creds.get_username()))
578
579         thrown = False
580         try:
581             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
582                                lp=self.get_loadparm(),
583                                credentials=creds)
584         except LdbError:
585             thrown = True
586         self.assertEquals(thrown, True)
587
588         messages = self.waitForMessages(isLastExpectedMessage)
589         self.assertEquals(1,
590                           len(messages),
591                           "Did not receive the expected number of messages")
592
593     def test_ldap_simple_bind_bad_user(self):
594         def isLastExpectedMessage(msg):
595             return (msg["type"] == "Authentication" and
596                     msg["Authentication"]["serviceDescription"] == "LDAP" and
597                     (msg["Authentication"]["status"] ==
598                         "NT_STATUS_NO_SUCH_USER") and
599                     (msg["Authentication"]["authDescription"] ==
600                         "simple bind") and
601                     (msg["Authentication"]["eventId"] ==
602                         EVT_ID_UNSUCCESSFUL_LOGON) and
603                     (msg["Authentication"]["logonType"] ==
604                         EVT_LOGON_NETWORK_CLEAR_TEXT))
605
606         creds = self.insta_creds(template=self.get_credentials())
607         creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
608
609         thrown = False
610         try:
611             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
612                                lp=self.get_loadparm(),
613                                credentials=creds)
614         except LdbError:
615             thrown = True
616         self.assertEquals(thrown, True)
617
618         messages = self.waitForMessages(isLastExpectedMessage)
619         self.assertEquals(1,
620                           len(messages),
621                           "Did not receive the expected number of messages")
622
623     def test_ldap_simple_bind_unparseable_user(self):
624         def isLastExpectedMessage(msg):
625             return (msg["type"] == "Authentication" and
626                     msg["Authentication"]["serviceDescription"] == "LDAP" and
627                     (msg["Authentication"]["status"] ==
628                         "NT_STATUS_NO_SUCH_USER") and
629                     (msg["Authentication"]["authDescription"] ==
630                         "simple bind") and
631                     (msg["Authentication"]["eventId"] ==
632                         EVT_ID_UNSUCCESSFUL_LOGON) and
633                     (msg["Authentication"]["logonType"] ==
634                         EVT_LOGON_NETWORK_CLEAR_TEXT))
635
636         creds = self.insta_creds(template=self.get_credentials())
637         creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
638
639         thrown = False
640         try:
641             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
642                                lp=self.get_loadparm(),
643                                credentials=creds)
644         except LdbError:
645             thrown = True
646         self.assertEquals(thrown, True)
647
648         messages = self.waitForMessages(isLastExpectedMessage)
649         self.assertEquals(1,
650                           len(messages),
651                           "Did not receive the expected number of messages")
652
653     #
654     # Note: as this test does not expect any messages it will
655     #       time out in the call to self.waitForMessages.
656     #       This is expected, but it will slow this test.
657     def test_ldap_anonymous_access_bind_only(self):
658         # Should be no logging for anonymous bind
659         # so receiving any message indicates a failure.
660         def isLastExpectedMessage(msg):
661             return True
662
663         creds = self.insta_creds(template=self.get_credentials())
664         creds.set_anonymous()
665
666         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
667                            lp=self.get_loadparm(),
668                            credentials=creds)
669
670         messages = self.waitForMessages(isLastExpectedMessage)
671         self.assertEquals(0,
672                           len(messages),
673                           "Did not receive the expected number of messages")
674
675     def test_ldap_anonymous_access(self):
676         def isLastExpectedMessage(msg):
677             return (msg["type"] == "Authorization" and
678                     msg["Authorization"]["serviceDescription"] == "LDAP" and
679                     msg["Authorization"]["transportProtection"] == "TLS" and
680                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
681                     msg["Authorization"]["authType"] == "no bind")
682
683         creds = self.insta_creds(template=self.get_credentials())
684         creds.set_anonymous()
685
686         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
687                            lp=self.get_loadparm(),
688                            credentials=creds)
689
690         try:
691             self.samdb.search(base=self.samdb.domain_dn())
692             self.fail("Expected an LdbError exception")
693         except LdbError:
694             pass
695
696         messages = self.waitForMessages(isLastExpectedMessage)
697         self.assertEquals(1,
698                           len(messages),
699                           "Did not receive the expected number of messages")
700
701     def test_smb(self):
702         def isLastExpectedMessage(msg):
703             return (msg["type"] == "Authorization" and
704                     msg["Authorization"]["serviceDescription"] == "SMB" and
705                     msg["Authorization"]["authType"] == "krb5" and
706                     msg["Authorization"]["transportProtection"] == "SMB")
707
708         creds = self.insta_creds(template=self.get_credentials())
709         smb.SMB(self.server,
710                 "sysvol",
711                 lp=self.get_loadparm(),
712                 creds=creds)
713
714         messages = self.waitForMessages(isLastExpectedMessage)
715         self.assertEquals(3,
716                           len(messages),
717                           "Did not receive the expected number of messages")
718         # Check the first message it should be an Authentication
719         msg = messages[0]
720         self.assertEquals("Authentication", msg["type"])
721         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
722         self.assertEquals("Kerberos KDC",
723                           msg["Authentication"]["serviceDescription"])
724         self.assertEquals("ENC-TS Pre-authentication",
725                           msg["Authentication"]["authDescription"])
726         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
727                           msg["Authentication"]["eventId"])
728         self.assertEquals(EVT_LOGON_NETWORK,
729                           msg["Authentication"]["logonType"])
730
731         # Check the second message it should be an Authentication
732         msg = messages[1]
733         self.assertEquals("Authentication", msg["type"])
734         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
735         self.assertEquals("Kerberos KDC",
736                           msg["Authentication"]["serviceDescription"])
737         self.assertEquals("ENC-TS Pre-authentication",
738                           msg["Authentication"]["authDescription"])
739         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
740                           msg["Authentication"]["eventId"])
741         self.assertEquals(EVT_LOGON_NETWORK,
742                           msg["Authentication"]["logonType"])
743
744     def test_smb_bad_password(self):
745         def isLastExpectedMessage(msg):
746             return (msg["type"] == "Authentication" and
747                     (msg["Authentication"]["serviceDescription"] ==
748                         "Kerberos KDC") and
749                     (msg["Authentication"]["status"] ==
750                         "NT_STATUS_WRONG_PASSWORD") and
751                     (msg["Authentication"]["authDescription"] ==
752                         "ENC-TS Pre-authentication"))
753
754         creds = self.insta_creds(template=self.get_credentials())
755         creds.set_password("badPassword")
756
757         thrown = False
758         try:
759             smb.SMB(self.server,
760                     "sysvol",
761                     lp=self.get_loadparm(),
762                     creds=creds)
763         except NTSTATUSError:
764             thrown = True
765         self.assertEquals(thrown, True)
766
767         messages = self.waitForMessages(isLastExpectedMessage)
768         self.assertEquals(1,
769                           len(messages),
770                           "Did not receive the expected number of messages")
771
772     def test_smb_bad_user(self):
773         def isLastExpectedMessage(msg):
774             return (msg["type"] == "Authentication" and
775                     (msg["Authentication"]["serviceDescription"] ==
776                         "Kerberos KDC") and
777                     (msg["Authentication"]["status"] ==
778                         "NT_STATUS_NO_SUCH_USER") and
779                     (msg["Authentication"]["authDescription"] ==
780                         "ENC-TS Pre-authentication") and
781                     (msg["Authentication"]["eventId"] ==
782                         EVT_ID_UNSUCCESSFUL_LOGON) and
783                     (msg["Authentication"]["logonType"] ==
784                         EVT_LOGON_NETWORK))
785
786         creds = self.insta_creds(template=self.get_credentials())
787         creds.set_username("badUser")
788
789         thrown = False
790         try:
791             smb.SMB(self.server,
792                     "sysvol",
793                     lp=self.get_loadparm(),
794                     creds=creds)
795         except NTSTATUSError:
796             thrown = True
797         self.assertEquals(thrown, True)
798
799         messages = self.waitForMessages(isLastExpectedMessage)
800         self.assertEquals(1,
801                           len(messages),
802                           "Did not receive the expected number of messages")
803
804     def test_smb1_anonymous(self):
805         def isLastExpectedMessage(msg):
806             return (msg["type"] == "Authorization" and
807                     msg["Authorization"]["serviceDescription"] == "SMB" and
808                     msg["Authorization"]["authType"] == "NTLMSSP" and
809                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
810                     msg["Authorization"]["transportProtection"] == "SMB")
811
812         server = os.environ["SERVER"]
813
814         path = "//%s/IPC$" % server
815         auth = "-N"
816         call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
817
818         messages = self.waitForMessages(isLastExpectedMessage)
819         self.assertEquals(3,
820                           len(messages),
821                           "Did not receive the expected number of messages")
822
823         # Check the first message it should be an Authentication
824         msg = messages[0]
825         self.assertEquals("Authentication", msg["type"])
826         self.assertEquals("NT_STATUS_NO_SUCH_USER",
827                           msg["Authentication"]["status"])
828         self.assertEquals("SMB",
829                           msg["Authentication"]["serviceDescription"])
830         self.assertEquals("NTLMSSP",
831                           msg["Authentication"]["authDescription"])
832         self.assertEquals("No-Password",
833                           msg["Authentication"]["passwordType"])
834         self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
835                           msg["Authentication"]["eventId"])
836         self.assertEquals(EVT_LOGON_NETWORK,
837                           msg["Authentication"]["logonType"])
838
839         # Check the second message it should be an Authentication
840         msg = messages[1]
841         self.assertEquals("Authentication", msg["type"])
842         self.assertEquals("NT_STATUS_OK",
843                           msg["Authentication"]["status"])
844         self.assertEquals("SMB",
845                           msg["Authentication"]["serviceDescription"])
846         self.assertEquals("NTLMSSP",
847                           msg["Authentication"]["authDescription"])
848         self.assertEquals("No-Password",
849                           msg["Authentication"]["passwordType"])
850         self.assertEquals("ANONYMOUS LOGON",
851                           msg["Authentication"]["becameAccount"])
852         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
853                           msg["Authentication"]["eventId"])
854         self.assertEquals(EVT_LOGON_NETWORK,
855                           msg["Authentication"]["logonType"])
856
857     def test_smb2_anonymous(self):
858         def isLastExpectedMessage(msg):
859             return (msg["type"] == "Authorization" and
860                     msg["Authorization"]["serviceDescription"] == "SMB2" and
861                     msg["Authorization"]["authType"] == "NTLMSSP" and
862                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
863                     msg["Authorization"]["transportProtection"] == "SMB")
864
865         server = os.environ["SERVER"]
866
867         path = "//%s/IPC$" % server
868         auth = "-N"
869         call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
870
871         messages = self.waitForMessages(isLastExpectedMessage)
872         self.assertEquals(3,
873                           len(messages),
874                           "Did not receive the expected number of messages")
875
876         # Check the first message it should be an Authentication
877         msg = messages[0]
878         self.assertEquals("Authentication", msg["type"])
879         self.assertEquals("NT_STATUS_NO_SUCH_USER",
880                           msg["Authentication"]["status"])
881         self.assertEquals("SMB2",
882                           msg["Authentication"]["serviceDescription"])
883         self.assertEquals("NTLMSSP",
884                           msg["Authentication"]["authDescription"])
885         self.assertEquals("No-Password",
886                           msg["Authentication"]["passwordType"])
887         self.assertEquals(EVT_ID_UNSUCCESSFUL_LOGON,
888                           msg["Authentication"]["eventId"])
889         self.assertEquals(EVT_LOGON_NETWORK,
890                           msg["Authentication"]["logonType"])
891
892         # Check the second message it should be an Authentication
893         msg = messages[1]
894         self.assertEquals("Authentication", msg["type"])
895         self.assertEquals("NT_STATUS_OK",
896                           msg["Authentication"]["status"])
897         self.assertEquals("SMB2",
898                           msg["Authentication"]["serviceDescription"])
899         self.assertEquals("NTLMSSP",
900                           msg["Authentication"]["authDescription"])
901         self.assertEquals("No-Password",
902                           msg["Authentication"]["passwordType"])
903         self.assertEquals("ANONYMOUS LOGON",
904                           msg["Authentication"]["becameAccount"])
905         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
906                           msg["Authentication"]["eventId"])
907         self.assertEquals(EVT_LOGON_NETWORK,
908                           msg["Authentication"]["logonType"])
909
910     def test_smb_no_krb_spnego(self):
911         def isLastExpectedMessage(msg):
912             return (msg["type"] == "Authorization" and
913                     msg["Authorization"]["serviceDescription"] == "SMB" and
914                     msg["Authorization"]["authType"] == "NTLMSSP" and
915                     msg["Authorization"]["transportProtection"] == "SMB")
916
917         creds = self.insta_creds(template=self.get_credentials(),
918                                  kerberos_state=DONT_USE_KERBEROS)
919         smb.SMB(self.server,
920                 "sysvol",
921                 lp=self.get_loadparm(),
922                 creds=creds)
923
924         messages = self.waitForMessages(isLastExpectedMessage)
925         self.assertEquals(2,
926                           len(messages),
927                           "Did not receive the expected number of messages")
928         # Check the first message it should be an Authentication
929         msg = messages[0]
930         self.assertEquals("Authentication", msg["type"])
931         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
932         self.assertEquals("SMB",
933                           msg["Authentication"]["serviceDescription"])
934         self.assertEquals("NTLMSSP",
935                           msg["Authentication"]["authDescription"])
936         self.assertEquals("NTLMv2",
937                           msg["Authentication"]["passwordType"])
938         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
939                           msg["Authentication"]["eventId"])
940         self.assertEquals(EVT_LOGON_NETWORK,
941                           msg["Authentication"]["logonType"])
942
943     def test_smb_no_krb_spnego_bad_password(self):
944         def isLastExpectedMessage(msg):
945             return (msg["type"] == "Authentication" and
946                     msg["Authentication"]["serviceDescription"] == "SMB" and
947                     msg["Authentication"]["authDescription"] == "NTLMSSP" and
948                     msg["Authentication"]["passwordType"] == "NTLMv2" and
949                     (msg["Authentication"]["status"] ==
950                         "NT_STATUS_WRONG_PASSWORD") and
951                     (msg["Authentication"]["eventId"] ==
952                         EVT_ID_UNSUCCESSFUL_LOGON) and
953                     (msg["Authentication"]["logonType"] ==
954                         EVT_LOGON_NETWORK))
955
956         creds = self.insta_creds(template=self.get_credentials(),
957                                  kerberos_state=DONT_USE_KERBEROS)
958         creds.set_password("badPassword")
959
960         thrown = False
961         try:
962             smb.SMB(self.server,
963                     "sysvol",
964                     lp=self.get_loadparm(),
965                     creds=creds)
966         except NTSTATUSError:
967             thrown = True
968         self.assertEquals(thrown, True)
969
970         messages = self.waitForMessages(isLastExpectedMessage)
971         self.assertEquals(1,
972                           len(messages),
973                           "Did not receive the expected number of messages")
974
975     def test_smb_no_krb_spnego_bad_user(self):
976         def isLastExpectedMessage(msg):
977             return (msg["type"] == "Authentication" and
978                     msg["Authentication"]["serviceDescription"] == "SMB" and
979                     msg["Authentication"]["authDescription"] == "NTLMSSP" and
980                     msg["Authentication"]["passwordType"] == "NTLMv2" and
981                     (msg["Authentication"]["status"] ==
982                         "NT_STATUS_NO_SUCH_USER") and
983                     (msg["Authentication"]["eventId"] ==
984                         EVT_ID_UNSUCCESSFUL_LOGON) and
985                     (msg["Authentication"]["logonType"] ==
986                         EVT_LOGON_NETWORK))
987
988         creds = self.insta_creds(template=self.get_credentials(),
989                                  kerberos_state=DONT_USE_KERBEROS)
990         creds.set_username("badUser")
991
992         thrown = False
993         try:
994             smb.SMB(self.server,
995                     "sysvol",
996                     lp=self.get_loadparm(),
997                     creds=creds)
998         except NTSTATUSError:
999             thrown = True
1000         self.assertEquals(thrown, True)
1001
1002         messages = self.waitForMessages(isLastExpectedMessage)
1003         self.assertEquals(1,
1004                           len(messages),
1005                           "Did not receive the expected number of messages")
1006
1007     def test_smb_no_krb_no_spnego_no_ntlmv2(self):
1008         def isLastExpectedMessage(msg):
1009             return (msg["type"] == "Authorization" and
1010                     msg["Authorization"]["serviceDescription"] == "SMB" and
1011                     msg["Authorization"]["authType"] == "bare-NTLM" and
1012                     msg["Authorization"]["transportProtection"] == "SMB")
1013
1014         creds = self.insta_creds(template=self.get_credentials(),
1015                                  kerberos_state=DONT_USE_KERBEROS)
1016         smb.SMB(self.server,
1017                 "sysvol",
1018                 lp=self.get_loadparm(),
1019                 creds=creds,
1020                 ntlmv2_auth=False,
1021                 use_spnego=False)
1022
1023         messages = self.waitForMessages(isLastExpectedMessage)
1024         self.assertEquals(2,
1025                           len(messages),
1026                           "Did not receive the expected number of messages")
1027         # Check the first message it should be an Authentication
1028         msg = messages[0]
1029         self.assertEquals("Authentication", msg["type"])
1030         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
1031         self.assertEquals("SMB",
1032                           msg["Authentication"]["serviceDescription"])
1033         self.assertEquals("bare-NTLM",
1034                           msg["Authentication"]["authDescription"])
1035         self.assertEquals("NTLMv1",
1036                           msg["Authentication"]["passwordType"])
1037         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
1038                           msg["Authentication"]["eventId"])
1039         self.assertEquals(EVT_LOGON_NETWORK,
1040                           msg["Authentication"]["logonType"])
1041
1042     def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
1043         def isLastExpectedMessage(msg):
1044             return (msg["type"] == "Authentication" and
1045                     msg["Authentication"]["serviceDescription"] == "SMB" and
1046                     msg["Authentication"]["authDescription"] == "bare-NTLM" and
1047                     msg["Authentication"]["passwordType"] == "NTLMv1" and
1048                     (msg["Authentication"]["status"] ==
1049                         "NT_STATUS_WRONG_PASSWORD") and
1050                     (msg["Authentication"]["eventId"] ==
1051                         EVT_ID_UNSUCCESSFUL_LOGON) and
1052                     (msg["Authentication"]["logonType"] ==
1053                         EVT_LOGON_NETWORK))
1054
1055         creds = self.insta_creds(template=self.get_credentials(),
1056                                  kerberos_state=DONT_USE_KERBEROS)
1057         creds.set_password("badPassword")
1058
1059         thrown = False
1060         try:
1061             smb.SMB(self.server,
1062                     "sysvol",
1063                     lp=self.get_loadparm(),
1064                     creds=creds,
1065                     ntlmv2_auth=False,
1066                     use_spnego=False)
1067         except NTSTATUSError:
1068             thrown = True
1069         self.assertEquals(thrown, True)
1070
1071         messages = self.waitForMessages(isLastExpectedMessage)
1072         self.assertEquals(1,
1073                           len(messages),
1074                           "Did not receive the expected number of messages")
1075
1076     def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
1077         def isLastExpectedMessage(msg):
1078             return (msg["type"] == "Authentication" and
1079                     msg["Authentication"]["serviceDescription"] == "SMB" and
1080                     msg["Authentication"]["authDescription"] == "bare-NTLM" and
1081                     msg["Authentication"]["passwordType"] == "NTLMv1" and
1082                     (msg["Authentication"]["status"] ==
1083                         "NT_STATUS_NO_SUCH_USER") and
1084                     (msg["Authentication"]["eventId"] ==
1085                         EVT_ID_UNSUCCESSFUL_LOGON) and
1086                     (msg["Authentication"]["logonType"] ==
1087                         EVT_LOGON_NETWORK))
1088
1089         creds = self.insta_creds(template=self.get_credentials(),
1090                                  kerberos_state=DONT_USE_KERBEROS)
1091         creds.set_username("badUser")
1092
1093         thrown = False
1094         try:
1095             smb.SMB(self.server,
1096                     "sysvol",
1097                     lp=self.get_loadparm(),
1098                     creds=creds,
1099                     ntlmv2_auth=False,
1100                     use_spnego=False)
1101         except NTSTATUSError:
1102             thrown = True
1103         self.assertEquals(thrown, True)
1104
1105         messages = self.waitForMessages(isLastExpectedMessage)
1106         self.assertEquals(1,
1107                           len(messages),
1108                           "Did not receive the expected number of messages")
1109
1110     def test_samlogon_interactive(self):
1111
1112         workstation = "AuthLogTests"
1113
1114         def isLastExpectedMessage(msg):
1115             return (msg["type"] == "Authentication" and
1116                     (msg["Authentication"]["serviceDescription"] ==
1117                         "SamLogon") and
1118                     (msg["Authentication"]["authDescription"] ==
1119                         "interactive") and
1120                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1121                     (msg["Authentication"]["workstation"] ==
1122                         r"\\%s" % workstation) and
1123                     (msg["Authentication"]["eventId"] ==
1124                         EVT_ID_SUCCESSFUL_LOGON) and
1125                     (msg["Authentication"]["logonType"] ==
1126                         EVT_LOGON_INTERACTIVE))
1127
1128         server = os.environ["SERVER"]
1129         user = os.environ["USERNAME"]
1130         password = os.environ["PASSWORD"]
1131         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1132
1133         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1134
1135         messages = self.waitForMessages(isLastExpectedMessage)
1136         messages = self.remove_netlogon_messages(messages)
1137         received = len(messages)
1138         self.assertIs(True,
1139                       (received == 5 or received == 6),
1140                       "Did not receive the expected number of messages")
1141
1142     def test_samlogon_interactive_bad_password(self):
1143
1144         workstation = "AuthLogTests"
1145
1146         def isLastExpectedMessage(msg):
1147             return (msg["type"] == "Authentication" and
1148                     (msg["Authentication"]["serviceDescription"] ==
1149                         "SamLogon") and
1150                     (msg["Authentication"]["authDescription"] ==
1151                         "interactive") and
1152                     (msg["Authentication"]["status"] ==
1153                         "NT_STATUS_WRONG_PASSWORD") and
1154                     (msg["Authentication"]["workstation"] ==
1155                         r"\\%s" % workstation) and
1156                     (msg["Authentication"]["eventId"] ==
1157                         EVT_ID_UNSUCCESSFUL_LOGON) and
1158                     (msg["Authentication"]["logonType"] ==
1159                         EVT_LOGON_INTERACTIVE))
1160
1161         server = os.environ["SERVER"]
1162         user = os.environ["USERNAME"]
1163         password = "badPassword"
1164         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1165
1166         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1167
1168         messages = self.waitForMessages(isLastExpectedMessage)
1169         messages = self.remove_netlogon_messages(messages)
1170         received = len(messages)
1171         self.assertIs(True,
1172                       (received == 5 or received == 6),
1173                       "Did not receive the expected number of messages")
1174
1175     def test_samlogon_interactive_bad_user(self):
1176
1177         workstation = "AuthLogTests"
1178
1179         def isLastExpectedMessage(msg):
1180             return (msg["type"] == "Authentication" and
1181                     (msg["Authentication"]["serviceDescription"] ==
1182                         "SamLogon") and
1183                     (msg["Authentication"]["authDescription"] ==
1184                         "interactive") and
1185                     (msg["Authentication"]["status"] ==
1186                         "NT_STATUS_NO_SUCH_USER") and
1187                     (msg["Authentication"]["workstation"] ==
1188                         r"\\%s" % workstation) and
1189                     (msg["Authentication"]["eventId"] ==
1190                         EVT_ID_UNSUCCESSFUL_LOGON) and
1191                     (msg["Authentication"]["logonType"] ==
1192                         EVT_LOGON_INTERACTIVE))
1193
1194         server = os.environ["SERVER"]
1195         user = "badUser"
1196         password = os.environ["PASSWORD"]
1197         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1198
1199         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1200
1201         messages = self.waitForMessages(isLastExpectedMessage)
1202         messages = self.remove_netlogon_messages(messages)
1203         received = len(messages)
1204         self.assertIs(True,
1205                       (received == 5 or received == 6),
1206                       "Did not receive the expected number of messages")
1207
1208     def test_samlogon_network(self):
1209
1210         workstation = "AuthLogTests"
1211
1212         def isLastExpectedMessage(msg):
1213             return (msg["type"] == "Authentication" and
1214                     (msg["Authentication"]["serviceDescription"] ==
1215                         "SamLogon") and
1216                     msg["Authentication"]["authDescription"] == "network" and
1217                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1218                     (msg["Authentication"]["workstation"] ==
1219                         r"\\%s" % workstation) and
1220                     (msg["Authentication"]["eventId"] ==
1221                         EVT_ID_SUCCESSFUL_LOGON) and
1222                     (msg["Authentication"]["logonType"] ==
1223                         EVT_LOGON_NETWORK))
1224
1225         server = os.environ["SERVER"]
1226         user = os.environ["USERNAME"]
1227         password = os.environ["PASSWORD"]
1228         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1229
1230         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1231
1232         messages = self.waitForMessages(isLastExpectedMessage)
1233         messages = self.remove_netlogon_messages(messages)
1234         received = len(messages)
1235         self.assertIs(True,
1236                       (received == 5 or received == 6),
1237                       "Did not receive the expected number of messages")
1238
1239     def test_samlogon_network_bad_password(self):
1240
1241         workstation = "AuthLogTests"
1242
1243         def isLastExpectedMessage(msg):
1244             return (msg["type"] == "Authentication" and
1245                     (msg["Authentication"]["serviceDescription"] ==
1246                         "SamLogon") and
1247                     msg["Authentication"]["authDescription"] == "network" and
1248                     (msg["Authentication"]["status"] ==
1249                         "NT_STATUS_WRONG_PASSWORD") and
1250                     (msg["Authentication"]["workstation"] ==
1251                         r"\\%s" % workstation) and
1252                     (msg["Authentication"]["eventId"] ==
1253                         EVT_ID_UNSUCCESSFUL_LOGON) and
1254                     (msg["Authentication"]["logonType"] ==
1255                         EVT_LOGON_NETWORK))
1256
1257         server = os.environ["SERVER"]
1258         user = os.environ["USERNAME"]
1259         password = "badPassword"
1260         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1261
1262         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1263
1264         messages = self.waitForMessages(isLastExpectedMessage)
1265         messages = self.remove_netlogon_messages(messages)
1266         received = len(messages)
1267         self.assertIs(True,
1268                       (received == 5 or received == 6),
1269                       "Did not receive the expected number of messages")
1270
1271     def test_samlogon_network_bad_user(self):
1272
1273         workstation = "AuthLogTests"
1274
1275         def isLastExpectedMessage(msg):
1276             return ((msg["type"] == "Authentication") and
1277                     (msg["Authentication"]["serviceDescription"] ==
1278                         "SamLogon") and
1279                     (msg["Authentication"]["authDescription"] == "network") and
1280                     (msg["Authentication"]["status"] ==
1281                         "NT_STATUS_NO_SUCH_USER") and
1282                     (msg["Authentication"]["workstation"] ==
1283                         r"\\%s" % workstation) and
1284                     (msg["Authentication"]["eventId"] ==
1285                         EVT_ID_UNSUCCESSFUL_LOGON) and
1286                     (msg["Authentication"]["logonType"] ==
1287                         EVT_LOGON_NETWORK))
1288
1289         server = os.environ["SERVER"]
1290         user = "badUser"
1291         password = os.environ["PASSWORD"]
1292         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1293
1294         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1295
1296         messages = self.waitForMessages(isLastExpectedMessage)
1297         messages = self.remove_netlogon_messages(messages)
1298         received = len(messages)
1299         self.assertIs(True,
1300                       (received == 5 or received == 6),
1301                       "Did not receive the expected number of messages")
1302
1303     def test_samlogon_network_mschap(self):
1304
1305         workstation = "AuthLogTests"
1306
1307         def isLastExpectedMessage(msg):
1308             return ((msg["type"] == "Authentication") and
1309                     (msg["Authentication"]["serviceDescription"] ==
1310                         "SamLogon") and
1311                     (msg["Authentication"]["authDescription"] == "network") and
1312                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1313                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1314                     (msg["Authentication"]["workstation"] ==
1315                         r"\\%s" % workstation) and
1316                     (msg["Authentication"]["eventId"] ==
1317                         EVT_ID_SUCCESSFUL_LOGON) and
1318                     (msg["Authentication"]["logonType"] ==
1319                         EVT_LOGON_NETWORK))
1320
1321         server = os.environ["SERVER"]
1322         user = os.environ["USERNAME"]
1323         password = os.environ["PASSWORD"]
1324         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1325             user, password, workstation, 2)
1326
1327         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1328
1329         messages = self.waitForMessages(isLastExpectedMessage)
1330         messages = self.remove_netlogon_messages(messages)
1331         received = len(messages)
1332         self.assertIs(True,
1333                       (received == 5 or received == 6),
1334                       "Did not receive the expected number of messages")
1335
1336     def test_samlogon_network_mschap_bad_password(self):
1337
1338         workstation = "AuthLogTests"
1339
1340         def isLastExpectedMessage(msg):
1341             return ((msg["type"] == "Authentication") and
1342                     (msg["Authentication"]["serviceDescription"] ==
1343                         "SamLogon") and
1344                     (msg["Authentication"]["authDescription"] == "network") and
1345                     (msg["Authentication"]["status"] ==
1346                         "NT_STATUS_WRONG_PASSWORD") and
1347                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1348                     (msg["Authentication"]["workstation"] ==
1349                         r"\\%s" % workstation) and
1350                     (msg["Authentication"]["eventId"] ==
1351                         EVT_ID_UNSUCCESSFUL_LOGON) and
1352                     (msg["Authentication"]["logonType"] ==
1353                         EVT_LOGON_NETWORK))
1354
1355         server = os.environ["SERVER"]
1356         user = os.environ["USERNAME"]
1357         password = "badPassword"
1358         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1359             user, password, workstation, 2)
1360
1361         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1362
1363         messages = self.waitForMessages(isLastExpectedMessage)
1364         messages = self.remove_netlogon_messages(messages)
1365         received = len(messages)
1366         self.assertIs(True,
1367                       (received == 5 or received == 6),
1368                       "Did not receive the expected number of messages")
1369
1370     def test_samlogon_network_mschap_bad_user(self):
1371
1372         workstation = "AuthLogTests"
1373
1374         def isLastExpectedMessage(msg):
1375             return ((msg["type"] == "Authentication") and
1376                     (msg["Authentication"]["serviceDescription"] ==
1377                         "SamLogon") and
1378                     (msg["Authentication"]["authDescription"] == "network") and
1379                     (msg["Authentication"]["status"] ==
1380                         "NT_STATUS_NO_SUCH_USER") and
1381                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1382                     (msg["Authentication"]["workstation"] ==
1383                         r"\\%s" % workstation) and
1384                     (msg["Authentication"]["eventId"] ==
1385                         EVT_ID_UNSUCCESSFUL_LOGON) and
1386                     (msg["Authentication"]["logonType"] ==
1387                         EVT_LOGON_NETWORK))
1388
1389         server = os.environ["SERVER"]
1390         user = "badUser"
1391         password = os.environ["PASSWORD"]
1392         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1393             user, password, workstation, 2)
1394
1395         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1396
1397         messages = self.waitForMessages(isLastExpectedMessage)
1398         messages = self.remove_netlogon_messages(messages)
1399         received = len(messages)
1400         self.assertIs(True,
1401                       (received == 5 or received == 6),
1402                       "Did not receive the expected number of messages")
1403
1404     def test_samlogon_schannel_seal(self):
1405
1406         workstation = "AuthLogTests"
1407
1408         def isLastExpectedMessage(msg):
1409             return ((msg["type"] == "Authentication") and
1410                     (msg["Authentication"]["serviceDescription"] ==
1411                         "SamLogon") and
1412                     (msg["Authentication"]["authDescription"] == "network") and
1413                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1414                     (msg["Authentication"]["workstation"] ==
1415                         r"\\%s" % workstation) and
1416                     (msg["Authentication"]["eventId"] ==
1417                         EVT_ID_SUCCESSFUL_LOGON) and
1418                     (msg["Authentication"]["logonType"] ==
1419                         EVT_LOGON_NETWORK))
1420
1421         server = os.environ["SERVER"]
1422         user = os.environ["USERNAME"]
1423         password = os.environ["PASSWORD"]
1424         samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1425
1426         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1427
1428         messages = self.waitForMessages(isLastExpectedMessage)
1429         messages = self.remove_netlogon_messages(messages)
1430         received = len(messages)
1431         self.assertIs(True,
1432                       (received == 5 or received == 6),
1433                       "Did not receive the expected number of messages")
1434
1435         # Check the second to last message it should be an Authorization
1436         msg = messages[-2]
1437         self.assertEquals("Authorization", msg["type"])
1438         self.assertEquals("DCE/RPC",
1439                           msg["Authorization"]["serviceDescription"])
1440         self.assertEquals("schannel", msg["Authorization"]["authType"])
1441         self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1442         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
1443
1444     # Signed logons get promoted to sealed, this test ensures that
1445     # this behaviour is not removed accidentally
1446     def test_samlogon_schannel_sign(self):
1447
1448         workstation = "AuthLogTests"
1449
1450         def isLastExpectedMessage(msg):
1451             return ((msg["type"] == "Authentication") and
1452                     (msg["Authentication"]["serviceDescription"] ==
1453                         "SamLogon") and
1454                     (msg["Authentication"]["authDescription"] == "network") and
1455                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1456                     (msg["Authentication"]["workstation"] ==
1457                         r"\\%s" % workstation) and
1458                     (msg["Authentication"]["eventId"] ==
1459                         EVT_ID_SUCCESSFUL_LOGON) and
1460                     (msg["Authentication"]["logonType"] ==
1461                         EVT_LOGON_NETWORK))
1462
1463         server = os.environ["SERVER"]
1464         user = os.environ["USERNAME"]
1465         password = os.environ["PASSWORD"]
1466         samlogon = "schannelsign;samlogon %s %s %s" % (
1467             user, password, workstation)
1468
1469         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1470
1471         messages = self.waitForMessages(isLastExpectedMessage)
1472         messages = self.remove_netlogon_messages(messages)
1473         received = len(messages)
1474         self.assertIs(True,
1475                       (received == 5 or received == 6),
1476                       "Did not receive the expected number of messages")
1477
1478         # Check the second to last message it should be an Authorization
1479         msg = messages[-2]
1480         self.assertEquals("Authorization", msg["type"])
1481         self.assertEquals("DCE/RPC",
1482                           msg["Authorization"]["serviceDescription"])
1483         self.assertEquals("schannel", msg["Authorization"]["authType"])
1484         self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1485         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))