python:tests: Store keys as bytes rather than as lists of ints
[samba.git] / python / samba / tests / auth_log.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for the Auth and AuthZ logging.
19 """
20 import samba.tests
21 from samba.dcerpc import srvsvc, dnsserver
22 import os
23 from samba.samba3 import libsmb_samba_internal as libsmb
24 from samba.samba3 import param as s3param
25 from samba.samdb import SamDB
26 import samba.tests.auth_log_base
27 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
28 from samba import NTSTATUSError
29 from subprocess import call
30 from ldb import LdbError
31 from samba.dcerpc.windows_event_ids import (
32     EVT_ID_SUCCESSFUL_LOGON,
33     EVT_ID_UNSUCCESSFUL_LOGON,
34     EVT_LOGON_NETWORK,
35     EVT_LOGON_INTERACTIVE,
36     EVT_LOGON_NETWORK_CLEAR_TEXT
37 )
38 import re
39
40
41 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
42
43     def setUp(self):
44         super(AuthLogTests, self).setUp()
45         self.remoteAddress = os.environ["CLIENT_IP"]
46
47     def tearDown(self):
48         super(AuthLogTests, self).tearDown()
49
50     def smb_connection(self, creds, use_spnego="yes", ntlmv2_auth="yes",
51                        force_smb1=False):
52         # the SMB bindings rely on having a s3 loadparm
53         lp = self.get_loadparm()
54         s3_lp = s3param.get_context()
55         s3_lp.load(lp.configfile)
56
57         # Allow the testcase to skip SPNEGO or use NTLMv1
58         s3_lp.set("client use spnego", use_spnego)
59         s3_lp.set("client ntlmv2 auth", ntlmv2_auth)
60
61         return libsmb.Conn(self.server, "sysvol", lp=s3_lp, creds=creds,
62                            force_smb1=force_smb1)
63
64     def _test_rpc_ncacn_np(self, authTypes, creds, service,
65                            binding, protection, checkFunction):
66         def isLastExpectedMessage(msg):
67             return (msg["type"] == "Authorization" and
68                     (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
69                      msg["Authorization"]["serviceDescription"] == service) and
70                     msg["Authorization"]["authType"] == authTypes[0] and
71                     msg["Authorization"]["transportProtection"] == protection)
72
73         if binding:
74             binding = "[%s]" % binding
75
76         if service == "dnsserver":
77             x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
78                                     self.get_loadparm(),
79                                     creds)
80         elif service == "srvsvc":
81             x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
82                               self.get_loadparm(),
83                               creds)
84
85         # The connection is passed to ensure the server
86         # messaging context stays up until all the messages have been received.
87         messages = self.waitForMessages(isLastExpectedMessage, x)
88         checkFunction(messages, authTypes, service, binding, protection)
89
90     def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
91         # Turn "[foo,bar]" into a list ("foo", "bar") to test
92         # lambda x: x removes anything that evaluates to False,
93         # including empty strings, so we handle "" as well
94         binding_list = \
95             list(filter(lambda x: x, re.compile(r'[\[,\]]').split(binding)))
96
97         # Handle explicit smb2, smb1 or auto negotiation
98         if "smb2" in binding_list:
99             self.assertEqual(serviceDescription, "SMB2")
100         elif "smb1" in binding_list:
101             self.assertEqual(serviceDescription, "SMB")
102         else:
103             self.assertIn(serviceDescription, ["SMB", "SMB2"])
104
105     def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
106                                 binding, protection):
107
108         expected_messages = len(authTypes)
109         self.assertEqual(expected_messages,
110                           len(messages),
111                           "Did not receive the expected number of messages")
112
113         # Check the first message it should be an Authentication
114         msg = messages[0]
115         self.assertEqual("Authentication", msg["type"])
116         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
117         self.assertEqual(
118             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
119         self.assertEqual(
120             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
121         self._assert_ncacn_np_serviceDescription(
122             binding, msg["Authentication"]["serviceDescription"])
123         self.assertEqual(authTypes[1],
124                           msg["Authentication"]["authDescription"])
125
126         # Check the second message it should be an Authorization
127         msg = messages[1]
128         self.assertEqual("Authorization", msg["type"])
129         self._assert_ncacn_np_serviceDescription(
130             binding, msg["Authorization"]["serviceDescription"])
131         self.assertEqual(authTypes[2], msg["Authorization"]["authType"])
132         self.assertEqual("SMB", msg["Authorization"]["transportProtection"])
133         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
134
135         # Check the third message it should be an Authentication
136         # if we are expecting 4 messages
137         if expected_messages == 4:
138             def checkServiceDescription(desc):
139                 return (desc == "DCE/RPC" or desc == service)
140
141             msg = messages[2]
142             self.assertEqual("Authentication", msg["type"])
143             self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
144             self.assertTrue(
145                 checkServiceDescription(
146                     msg["Authentication"]["serviceDescription"]))
147
148             self.assertEqual(authTypes[3],
149                               msg["Authentication"]["authDescription"])
150             self.assertEqual(
151                 EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
152             self.assertEqual(
153                 EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
154
155     def rpc_ncacn_np_krb5_check(
156             self,
157             messages,
158             authTypes,
159             service,
160             binding,
161             protection):
162
163         expected_messages = len(authTypes)
164         self.assertEqual(expected_messages,
165                           len(messages),
166                           "Did not receive the expected number of messages")
167
168         # Check the first message it should be an Authentication
169         # This is almost certainly Authentication over UDP, and is probably
170         # returning message too big,
171         msg = messages[0]
172         self.assertEqual("Authentication", msg["type"])
173         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
174         self.assertEqual("Kerberos KDC",
175                           msg["Authentication"]["serviceDescription"])
176         self.assertEqual(authTypes[1],
177                           msg["Authentication"]["authDescription"])
178         self.assertEqual(
179             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
180         self.assertEqual(
181             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
182
183         # Check the second message it should be an Authentication
184         # This this the TCP Authentication in response to the message too big
185         # response to the UDP Authentication
186         msg = messages[1]
187         self.assertEqual("Authentication", msg["type"])
188         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
189         self.assertEqual("Kerberos KDC",
190                           msg["Authentication"]["serviceDescription"])
191         self.assertEqual(authTypes[2],
192                           msg["Authentication"]["authDescription"])
193         self.assertEqual(
194             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
195         self.assertEqual(
196             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
197
198         # Check the third message it should be an Authorization
199         msg = messages[2]
200         self.assertEqual("Authorization", msg["type"])
201         self._assert_ncacn_np_serviceDescription(
202             binding, msg["Authorization"]["serviceDescription"])
203         self.assertEqual(authTypes[3], msg["Authorization"]["authType"])
204         self.assertEqual("SMB", msg["Authorization"]["transportProtection"])
205         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
206
207     def test_rpc_ncacn_np_ntlm_dns_sign(self):
208         creds = self.insta_creds(template=self.get_credentials(),
209                                  kerberos_state=DONT_USE_KERBEROS)
210         self._test_rpc_ncacn_np(["NTLMSSP",
211                                  "NTLMSSP",
212                                  "NTLMSSP",
213                                  "NTLMSSP"],
214                                 creds, "dnsserver", "sign", "SIGN",
215                                 self.rpc_ncacn_np_ntlm_check)
216
217     def test_rpc_ncacn_np_ntlm_srv_sign(self):
218         creds = self.insta_creds(template=self.get_credentials(),
219                                  kerberos_state=DONT_USE_KERBEROS)
220         self._test_rpc_ncacn_np(["NTLMSSP",
221                                  "NTLMSSP",
222                                  "NTLMSSP",
223                                  "NTLMSSP"],
224                                 creds, "srvsvc", "sign", "SIGN",
225                                 self.rpc_ncacn_np_ntlm_check)
226
227     def test_rpc_ncacn_np_ntlm_dns(self):
228         creds = self.insta_creds(template=self.get_credentials(),
229                                  kerberos_state=DONT_USE_KERBEROS)
230         self._test_rpc_ncacn_np(["ncacn_np",
231                                  "NTLMSSP",
232                                  "NTLMSSP"],
233                                 creds, "dnsserver", "", "SMB",
234                                 self.rpc_ncacn_np_ntlm_check)
235
236     def test_rpc_ncacn_np_ntlm_srv(self):
237         creds = self.insta_creds(template=self.get_credentials(),
238                                  kerberos_state=DONT_USE_KERBEROS)
239         self._test_rpc_ncacn_np(["ncacn_np",
240                                  "NTLMSSP",
241                                  "NTLMSSP"],
242                                 creds, "srvsvc", "", "SMB",
243                                 self.rpc_ncacn_np_ntlm_check)
244
245     def test_rpc_ncacn_np_krb_dns_sign(self):
246         creds = self.insta_creds(template=self.get_credentials(),
247                                  kerberos_state=MUST_USE_KERBEROS)
248         self._test_rpc_ncacn_np(["krb5",
249                                  "ENC-TS Pre-authentication",
250                                  "ENC-TS Pre-authentication",
251                                  "krb5"],
252                                 creds, "dnsserver", "sign", "SIGN",
253                                 self.rpc_ncacn_np_krb5_check)
254
255     def test_rpc_ncacn_np_krb_srv_sign(self):
256         creds = self.insta_creds(template=self.get_credentials(),
257                                  kerberos_state=MUST_USE_KERBEROS)
258         self._test_rpc_ncacn_np(["krb5",
259                                  "ENC-TS Pre-authentication",
260                                  "ENC-TS Pre-authentication",
261                                  "krb5"],
262                                 creds, "srvsvc", "sign", "SIGN",
263                                 self.rpc_ncacn_np_krb5_check)
264
265     def test_rpc_ncacn_np_krb_dns(self):
266         creds = self.insta_creds(template=self.get_credentials(),
267                                  kerberos_state=MUST_USE_KERBEROS)
268         self._test_rpc_ncacn_np(["ncacn_np",
269                                  "ENC-TS Pre-authentication",
270                                  "ENC-TS Pre-authentication",
271                                  "krb5"],
272                                 creds, "dnsserver", "", "SMB",
273                                 self.rpc_ncacn_np_krb5_check)
274
275     def test_rpc_ncacn_np_krb_dns_smb2(self):
276         creds = self.insta_creds(template=self.get_credentials(),
277                                  kerberos_state=MUST_USE_KERBEROS)
278         self._test_rpc_ncacn_np(["ncacn_np",
279                                  "ENC-TS Pre-authentication",
280                                  "ENC-TS Pre-authentication",
281                                  "krb5"],
282                                 creds, "dnsserver", "smb2", "SMB",
283                                 self.rpc_ncacn_np_krb5_check)
284
285     def test_rpc_ncacn_np_krb_srv(self):
286         creds = self.insta_creds(template=self.get_credentials(),
287                                  kerberos_state=MUST_USE_KERBEROS)
288         self._test_rpc_ncacn_np(["ncacn_np",
289                                  "ENC-TS Pre-authentication",
290                                  "ENC-TS Pre-authentication",
291                                  "krb5"],
292                                 creds, "srvsvc", "", "SMB",
293                                 self.rpc_ncacn_np_krb5_check)
294
295     def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
296                                binding, protection, checkFunction):
297         def isLastExpectedMessage(msg):
298             return (msg["type"] == "Authorization" and
299                     msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
300                     msg["Authorization"]["authType"] == authTypes[0] and
301                     msg["Authorization"]["transportProtection"] == protection)
302
303         if binding:
304             binding = "[%s]" % binding
305
306         if service == "dnsserver":
307             conn = dnsserver.dnsserver(
308                 "ncacn_ip_tcp:%s%s" % (self.server, binding),
309                 self.get_loadparm(),
310                 creds)
311         elif service == "srvsvc":
312             conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
313                                  self.get_loadparm(),
314                                  creds)
315
316         messages = self.waitForMessages(isLastExpectedMessage, conn)
317         checkFunction(messages, authTypes, service, binding, protection)
318
319     def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
320                                     binding, protection):
321
322         expected_messages = len(authTypes)
323         self.assertEqual(expected_messages,
324                           len(messages),
325                           "Did not receive the expected number of messages")
326
327         # Check the first message it should be an Authorization
328         msg = messages[0]
329         self.assertEqual("Authorization", msg["type"])
330         self.assertEqual("DCE/RPC",
331                           msg["Authorization"]["serviceDescription"])
332         self.assertEqual(authTypes[1], msg["Authorization"]["authType"])
333         self.assertEqual("NONE", msg["Authorization"]["transportProtection"])
334         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
335
336         # Check the second message it should be an Authentication
337         msg = messages[1]
338         self.assertEqual("Authentication", msg["type"])
339         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
340         self.assertEqual("DCE/RPC",
341                           msg["Authentication"]["serviceDescription"])
342         self.assertEqual(authTypes[2],
343                           msg["Authentication"]["authDescription"])
344         self.assertEqual(
345             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
346         self.assertEqual(
347             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
348
349     def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
350                                     binding, protection):
351
352         expected_messages = len(authTypes)
353         self.assertEqual(expected_messages,
354                           len(messages),
355                           "Did not receive the expected number of messages")
356
357         # Check the first message it should be an Authorization
358         msg = messages[0]
359         self.assertEqual("Authorization", msg["type"])
360         self.assertEqual("DCE/RPC",
361                           msg["Authorization"]["serviceDescription"])
362         self.assertEqual(authTypes[1], msg["Authorization"]["authType"])
363         self.assertEqual("NONE", msg["Authorization"]["transportProtection"])
364         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
365
366         # Check the second message it should be an Authentication
367         msg = messages[1]
368         self.assertEqual("Authentication", msg["type"])
369         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
370         self.assertEqual("Kerberos KDC",
371                           msg["Authentication"]["serviceDescription"])
372         self.assertEqual(authTypes[2],
373                           msg["Authentication"]["authDescription"])
374         self.assertEqual(
375             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
376         self.assertEqual(
377             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
378
379         # Check the third message it should be an Authentication
380         msg = messages[2]
381         self.assertEqual("Authentication", msg["type"])
382         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
383         self.assertEqual("Kerberos KDC",
384                           msg["Authentication"]["serviceDescription"])
385         self.assertEqual(authTypes[2],
386                           msg["Authentication"]["authDescription"])
387         self.assertEqual(
388             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
389         self.assertEqual(
390             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
391
392     def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
393         creds = self.insta_creds(template=self.get_credentials(),
394                                  kerberos_state=DONT_USE_KERBEROS)
395         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
396                                      "ncacn_ip_tcp",
397                                      "NTLMSSP"],
398                                     creds, "dnsserver", "sign", "SIGN",
399                                     self.rpc_ncacn_ip_tcp_ntlm_check)
400
401     def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
402         creds = self.insta_creds(template=self.get_credentials(),
403                                  kerberos_state=MUST_USE_KERBEROS)
404         self._test_rpc_ncacn_ip_tcp(["krb5",
405                                      "ncacn_ip_tcp",
406                                      "ENC-TS Pre-authentication",
407                                      "ENC-TS Pre-authentication"],
408                                     creds, "dnsserver", "sign", "SIGN",
409                                     self.rpc_ncacn_ip_tcp_krb5_check)
410
411     def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
412         creds = self.insta_creds(template=self.get_credentials(),
413                                  kerberos_state=DONT_USE_KERBEROS)
414         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
415                                      "ncacn_ip_tcp",
416                                      "NTLMSSP"],
417                                     creds, "dnsserver", "", "SIGN",
418                                     self.rpc_ncacn_ip_tcp_ntlm_check)
419
420     def test_rpc_ncacn_ip_tcp_krb5_dns(self):
421         creds = self.insta_creds(template=self.get_credentials(),
422                                  kerberos_state=MUST_USE_KERBEROS)
423         self._test_rpc_ncacn_ip_tcp(["krb5",
424                                      "ncacn_ip_tcp",
425                                      "ENC-TS Pre-authentication",
426                                      "ENC-TS Pre-authentication"],
427                                     creds, "dnsserver", "", "SIGN",
428                                     self.rpc_ncacn_ip_tcp_krb5_check)
429
430     def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
431         creds = self.insta_creds(template=self.get_credentials(),
432                                  kerberos_state=DONT_USE_KERBEROS)
433         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
434                                      "ncacn_ip_tcp",
435                                      "NTLMSSP"],
436                                     creds, "dnsserver", "connect", "NONE",
437                                     self.rpc_ncacn_ip_tcp_ntlm_check)
438
439     def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
440         creds = self.insta_creds(template=self.get_credentials(),
441                                  kerberos_state=MUST_USE_KERBEROS)
442         self._test_rpc_ncacn_ip_tcp(["krb5",
443                                      "ncacn_ip_tcp",
444                                      "ENC-TS Pre-authentication",
445                                      "ENC-TS Pre-authentication"],
446                                     creds, "dnsserver", "connect", "NONE",
447                                     self.rpc_ncacn_ip_tcp_krb5_check)
448
449     def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
450         creds = self.insta_creds(template=self.get_credentials(),
451                                  kerberos_state=DONT_USE_KERBEROS)
452         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
453                                      "ncacn_ip_tcp",
454                                      "NTLMSSP"],
455                                     creds, "dnsserver", "seal", "SEAL",
456                                     self.rpc_ncacn_ip_tcp_ntlm_check)
457
458     def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
459         creds = self.insta_creds(template=self.get_credentials(),
460                                  kerberos_state=MUST_USE_KERBEROS)
461         self._test_rpc_ncacn_ip_tcp(["krb5",
462                                      "ncacn_ip_tcp",
463                                      "ENC-TS Pre-authentication",
464                                      "ENC-TS Pre-authentication"],
465                                     creds, "dnsserver", "seal", "SEAL",
466                                     self.rpc_ncacn_ip_tcp_krb5_check)
467
468     def test_ldap(self):
469
470         def isLastExpectedMessage(msg):
471             return (msg["type"] == "Authorization" and
472                     msg["Authorization"]["serviceDescription"] == "LDAP" and
473                     msg["Authorization"]["transportProtection"] == "SIGN" and
474                     msg["Authorization"]["authType"] == "krb5")
475
476         self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
477                            lp=self.get_loadparm(),
478                            credentials=self.get_credentials())
479
480         messages = self.waitForMessages(isLastExpectedMessage)
481         self.assertEqual(3,
482                           len(messages),
483                           "Did not receive the expected number of messages")
484
485         # Check the first message it should be an Authentication
486         msg = messages[0]
487         self.assertEqual("Authentication", msg["type"])
488         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
489         self.assertEqual("Kerberos KDC",
490                           msg["Authentication"]["serviceDescription"])
491         self.assertEqual("ENC-TS Pre-authentication",
492                           msg["Authentication"]["authDescription"])
493         self.assertTrue(msg["Authentication"]["duration"] > 0)
494         self.assertEqual(
495             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
496         self.assertEqual(
497             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
498
499         # Check the second message it should be an Authentication
500         msg = messages[1]
501         self.assertEqual("Authentication", msg["type"])
502         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
503         self.assertEqual("Kerberos KDC",
504                           msg["Authentication"]["serviceDescription"])
505         self.assertEqual("ENC-TS Pre-authentication",
506                           msg["Authentication"]["authDescription"])
507         self.assertTrue(msg["Authentication"]["duration"] > 0)
508         self.assertEqual(
509             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
510         self.assertEqual(
511             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
512
513     def test_ldap_ntlm(self):
514
515         def isLastExpectedMessage(msg):
516             return (msg["type"] == "Authorization" and
517                     msg["Authorization"]["serviceDescription"] == "LDAP" and
518                     msg["Authorization"]["transportProtection"] == "SEAL" and
519                     msg["Authorization"]["authType"] == "NTLMSSP")
520
521         self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
522                            lp=self.get_loadparm(),
523                            credentials=self.get_credentials())
524
525         messages = self.waitForMessages(isLastExpectedMessage)
526         self.assertEqual(2,
527                           len(messages),
528                           "Did not receive the expected number of messages")
529         # Check the first message it should be an Authentication
530         msg = messages[0]
531         self.assertEqual("Authentication", msg["type"])
532         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
533         self.assertEqual("LDAP",
534                           msg["Authentication"]["serviceDescription"])
535         self.assertEqual("NTLMSSP", msg["Authentication"]["authDescription"])
536         self.assertTrue(msg["Authentication"]["duration"] > 0)
537         self.assertEqual(
538             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
539         self.assertEqual(
540             EVT_LOGON_NETWORK, msg["Authentication"]["logonType"])
541
542     def test_ldap_simple_bind(self):
543         def isLastExpectedMessage(msg):
544             return (msg["type"] == "Authorization" and
545                     msg["Authorization"]["serviceDescription"] == "LDAP" and
546                     msg["Authorization"]["transportProtection"] == "TLS" and
547                     msg["Authorization"]["authType"] == "simple bind")
548
549         creds = self.insta_creds(template=self.get_credentials())
550         creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
551                                       creds.get_username()))
552
553         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
554                            lp=self.get_loadparm(),
555                            credentials=creds)
556
557         messages = self.waitForMessages(isLastExpectedMessage)
558         self.assertEqual(2,
559                           len(messages),
560                           "Did not receive the expected number of messages")
561
562         # Check the first message it should be an Authentication
563         msg = messages[0]
564         self.assertEqual("Authentication", msg["type"])
565         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
566         self.assertEqual("LDAP",
567                           msg["Authentication"]["serviceDescription"])
568         self.assertEqual("simple bind/TLS",
569                           msg["Authentication"]["authDescription"])
570         self.assertEqual(
571             EVT_ID_SUCCESSFUL_LOGON, msg["Authentication"]["eventId"])
572         self.assertEqual(
573             EVT_LOGON_NETWORK_CLEAR_TEXT, msg["Authentication"]["logonType"])
574
575     def test_ldap_simple_bind_bad_password(self):
576         def isLastExpectedMessage(msg):
577             return (msg["type"] == "Authentication" and
578                     msg["Authentication"]["serviceDescription"] == "LDAP" and
579                     (msg["Authentication"]["status"] ==
580                         "NT_STATUS_WRONG_PASSWORD") and
581                     (msg["Authentication"]["authDescription"] ==
582                         "simple bind/TLS") and
583                     (msg["Authentication"]["eventId"] ==
584                         EVT_ID_UNSUCCESSFUL_LOGON) and
585                     (msg["Authentication"]["logonType"] ==
586                         EVT_LOGON_NETWORK_CLEAR_TEXT))
587
588         creds = self.insta_creds(template=self.get_credentials())
589         creds.set_password("badPassword")
590         creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
591                                       creds.get_username()))
592
593         thrown = False
594         try:
595             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
596                                lp=self.get_loadparm(),
597                                credentials=creds)
598         except LdbError:
599             thrown = True
600         self.assertEqual(thrown, True)
601
602         messages = self.waitForMessages(isLastExpectedMessage)
603         self.assertEqual(1,
604                           len(messages),
605                           "Did not receive the expected number of messages")
606
607     def test_ldap_simple_bind_bad_user(self):
608         def isLastExpectedMessage(msg):
609             return (msg["type"] == "Authentication" and
610                     msg["Authentication"]["serviceDescription"] == "LDAP" and
611                     (msg["Authentication"]["status"] ==
612                         "NT_STATUS_NO_SUCH_USER") and
613                     (msg["Authentication"]["authDescription"] ==
614                         "simple bind/TLS") and
615                     (msg["Authentication"]["eventId"] ==
616                         EVT_ID_UNSUCCESSFUL_LOGON) and
617                     (msg["Authentication"]["logonType"] ==
618                         EVT_LOGON_NETWORK_CLEAR_TEXT))
619
620         creds = self.insta_creds(template=self.get_credentials())
621         creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
622
623         thrown = False
624         try:
625             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
626                                lp=self.get_loadparm(),
627                                credentials=creds)
628         except LdbError:
629             thrown = True
630         self.assertEqual(thrown, True)
631
632         messages = self.waitForMessages(isLastExpectedMessage)
633         self.assertEqual(1,
634                           len(messages),
635                           "Did not receive the expected number of messages")
636
637     def test_ldap_simple_bind_unparseable_user(self):
638         def isLastExpectedMessage(msg):
639             return (msg["type"] == "Authentication" and
640                     msg["Authentication"]["serviceDescription"] == "LDAP" and
641                     (msg["Authentication"]["status"] ==
642                         "NT_STATUS_NO_SUCH_USER") and
643                     (msg["Authentication"]["authDescription"] ==
644                         "simple bind/TLS") and
645                     (msg["Authentication"]["eventId"] ==
646                         EVT_ID_UNSUCCESSFUL_LOGON) and
647                     (msg["Authentication"]["logonType"] ==
648                         EVT_LOGON_NETWORK_CLEAR_TEXT))
649
650         creds = self.insta_creds(template=self.get_credentials())
651         creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
652
653         thrown = False
654         try:
655             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
656                                lp=self.get_loadparm(),
657                                credentials=creds)
658         except LdbError:
659             thrown = True
660         self.assertEqual(thrown, True)
661
662         messages = self.waitForMessages(isLastExpectedMessage)
663         self.assertEqual(1,
664                           len(messages),
665                           "Did not receive the expected number of messages")
666
667     #
668     # Note: as this test does not expect any messages it will
669     #       time out in the call to self.waitForMessages.
670     #       This is expected, but it will slow this test.
671     def test_ldap_anonymous_access_bind_only(self):
672         # Should be no logging for anonymous bind
673         # so receiving any message indicates a failure.
674         def isLastExpectedMessage(msg):
675             return True
676
677         creds = self.insta_creds(template=self.get_credentials())
678         creds.set_anonymous()
679
680         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
681                            lp=self.get_loadparm(),
682                            credentials=creds)
683
684         messages = self.waitForMessages(isLastExpectedMessage)
685         self.assertEqual(0,
686                           len(messages),
687                           "Did not receive the expected number of messages")
688
689     def test_ldap_anonymous_access(self):
690         def isLastExpectedMessage(msg):
691             return (msg["type"] == "Authorization" and
692                     msg["Authorization"]["serviceDescription"] == "LDAP" and
693                     msg["Authorization"]["transportProtection"] == "TLS" and
694                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
695                     msg["Authorization"]["authType"] == "no bind")
696
697         creds = self.insta_creds(template=self.get_credentials())
698         creds.set_anonymous()
699
700         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
701                            lp=self.get_loadparm(),
702                            credentials=creds)
703
704         try:
705             self.samdb.search(base=self.samdb.domain_dn())
706             self.fail("Expected an LdbError exception")
707         except LdbError:
708             pass
709
710         messages = self.waitForMessages(isLastExpectedMessage)
711         self.assertEqual(1,
712                           len(messages),
713                           "Did not receive the expected number of messages")
714
715     def test_smb(self):
716         def isLastExpectedMessage(msg):
717             return (msg["type"] == "Authorization" and
718                     "SMB" in msg["Authorization"]["serviceDescription"] and
719                     msg["Authorization"]["authType"] == "krb5" and
720                     msg["Authorization"]["transportProtection"] == "SMB")
721
722         creds = self.insta_creds(template=self.get_credentials())
723         self.smb_connection(creds)
724
725         messages = self.waitForMessages(isLastExpectedMessage)
726         self.assertEqual(3,
727                           len(messages),
728                           "Did not receive the expected number of messages")
729         # Check the first message it should be an Authentication
730         msg = messages[0]
731         self.assertEqual("Authentication", msg["type"])
732         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
733         self.assertEqual("Kerberos KDC",
734                           msg["Authentication"]["serviceDescription"])
735         self.assertEqual("ENC-TS Pre-authentication",
736                           msg["Authentication"]["authDescription"])
737         self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
738                           msg["Authentication"]["eventId"])
739         self.assertEqual(EVT_LOGON_NETWORK,
740                           msg["Authentication"]["logonType"])
741
742         # Check the second message it should be an Authentication
743         msg = messages[1]
744         self.assertEqual("Authentication", msg["type"])
745         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
746         self.assertEqual("Kerberos KDC",
747                           msg["Authentication"]["serviceDescription"])
748         self.assertEqual("ENC-TS Pre-authentication",
749                           msg["Authentication"]["authDescription"])
750         self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
751                           msg["Authentication"]["eventId"])
752         self.assertEqual(EVT_LOGON_NETWORK,
753                           msg["Authentication"]["logonType"])
754
755     def test_smb_bad_password(self):
756         def isLastExpectedMessage(msg):
757             return (msg["type"] == "Authentication" and
758                     (msg["Authentication"]["serviceDescription"] ==
759                         "Kerberos KDC") and
760                     (msg["Authentication"]["status"] ==
761                         "NT_STATUS_WRONG_PASSWORD") and
762                     (msg["Authentication"]["authDescription"] ==
763                         "ENC-TS Pre-authentication"))
764
765         creds = self.insta_creds(template=self.get_credentials())
766         creds.set_kerberos_state(MUST_USE_KERBEROS)
767         creds.set_password("badPassword")
768
769         thrown = False
770         try:
771             self.smb_connection(creds)
772         except NTSTATUSError:
773             thrown = True
774         self.assertEqual(thrown, True)
775
776         messages = self.waitForMessages(isLastExpectedMessage)
777         self.assertEqual(1,
778                           len(messages),
779                           "Did not receive the expected number of messages")
780
781     def test_smb_bad_user(self):
782         def isLastExpectedMessage(msg):
783             return (msg["type"] == "Authentication" and
784                     (msg["Authentication"]["serviceDescription"] ==
785                         "Kerberos KDC") and
786                     (msg["Authentication"]["status"] ==
787                         "NT_STATUS_NO_SUCH_USER") and
788                     (msg["Authentication"]["authDescription"] ==
789                         "AS-REQ") and
790                     (msg["Authentication"]["eventId"] ==
791                         EVT_ID_UNSUCCESSFUL_LOGON) and
792                     (msg["Authentication"]["logonType"] ==
793                         EVT_LOGON_NETWORK))
794
795         creds = self.insta_creds(template=self.get_credentials())
796         creds.set_kerberos_state(MUST_USE_KERBEROS)
797         creds.set_username("badUser")
798
799         thrown = False
800         try:
801             self.smb_connection(creds)
802         except NTSTATUSError:
803             thrown = True
804         self.assertEqual(thrown, True)
805
806         messages = self.waitForMessages(isLastExpectedMessage)
807         self.assertEqual(1,
808                           len(messages),
809                           "Did not receive the expected number of messages")
810
811     def test_smb1_anonymous(self):
812         def isLastExpectedMessage(msg):
813             return (msg["type"] == "Authorization" and
814                     msg["Authorization"]["serviceDescription"] == "SMB" and
815                     msg["Authorization"]["authType"] == "NTLMSSP" and
816                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
817                     msg["Authorization"]["transportProtection"] == "SMB")
818
819         server = os.environ["SERVER"]
820
821         path = "//%s/IPC$" % server
822         auth = "-N"
823         call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
824
825         messages = self.waitForMessages(isLastExpectedMessage)
826         self.assertEqual(3,
827                           len(messages),
828                           "Did not receive the expected number of messages")
829
830         # Check the first message it should be an Authentication
831         msg = messages[0]
832         self.assertEqual("Authentication", msg["type"])
833         self.assertEqual("NT_STATUS_NO_SUCH_USER",
834                           msg["Authentication"]["status"])
835         self.assertEqual("SMB",
836                           msg["Authentication"]["serviceDescription"])
837         self.assertEqual("NTLMSSP",
838                           msg["Authentication"]["authDescription"])
839         self.assertEqual("No-Password",
840                           msg["Authentication"]["passwordType"])
841         self.assertEqual(EVT_ID_UNSUCCESSFUL_LOGON,
842                           msg["Authentication"]["eventId"])
843         self.assertEqual(EVT_LOGON_NETWORK,
844                           msg["Authentication"]["logonType"])
845
846         # Check the second message it should be an Authentication
847         msg = messages[1]
848         self.assertEqual("Authentication", msg["type"])
849         self.assertEqual("NT_STATUS_OK",
850                           msg["Authentication"]["status"])
851         self.assertEqual("SMB",
852                           msg["Authentication"]["serviceDescription"])
853         self.assertEqual("NTLMSSP",
854                           msg["Authentication"]["authDescription"])
855         self.assertEqual("No-Password",
856                           msg["Authentication"]["passwordType"])
857         self.assertEqual("ANONYMOUS LOGON",
858                           msg["Authentication"]["becameAccount"])
859         self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
860                           msg["Authentication"]["eventId"])
861         self.assertEqual(EVT_LOGON_NETWORK,
862                           msg["Authentication"]["logonType"])
863
864     def test_smb2_anonymous(self):
865         def isLastExpectedMessage(msg):
866             return (msg["type"] == "Authorization" and
867                     msg["Authorization"]["serviceDescription"] == "SMB2" and
868                     msg["Authorization"]["authType"] == "NTLMSSP" and
869                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
870                     msg["Authorization"]["transportProtection"] == "SMB")
871
872         server = os.environ["SERVER"]
873
874         path = "//%s/IPC$" % server
875         auth = "-N"
876         call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
877
878         messages = self.waitForMessages(isLastExpectedMessage)
879         self.assertEqual(3,
880                           len(messages),
881                           "Did not receive the expected number of messages")
882
883         # Check the first message it should be an Authentication
884         msg = messages[0]
885         self.assertEqual("Authentication", msg["type"])
886         self.assertEqual("NT_STATUS_NO_SUCH_USER",
887                           msg["Authentication"]["status"])
888         self.assertEqual("SMB2",
889                           msg["Authentication"]["serviceDescription"])
890         self.assertEqual("NTLMSSP",
891                           msg["Authentication"]["authDescription"])
892         self.assertEqual("No-Password",
893                           msg["Authentication"]["passwordType"])
894         self.assertEqual(EVT_ID_UNSUCCESSFUL_LOGON,
895                           msg["Authentication"]["eventId"])
896         self.assertEqual(EVT_LOGON_NETWORK,
897                           msg["Authentication"]["logonType"])
898
899         # Check the second message it should be an Authentication
900         msg = messages[1]
901         self.assertEqual("Authentication", msg["type"])
902         self.assertEqual("NT_STATUS_OK",
903                           msg["Authentication"]["status"])
904         self.assertEqual("SMB2",
905                           msg["Authentication"]["serviceDescription"])
906         self.assertEqual("NTLMSSP",
907                           msg["Authentication"]["authDescription"])
908         self.assertEqual("No-Password",
909                           msg["Authentication"]["passwordType"])
910         self.assertEqual("ANONYMOUS LOGON",
911                           msg["Authentication"]["becameAccount"])
912         self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
913                           msg["Authentication"]["eventId"])
914         self.assertEqual(EVT_LOGON_NETWORK,
915                           msg["Authentication"]["logonType"])
916
917     def test_smb_no_krb_spnego(self):
918         def isLastExpectedMessage(msg):
919             return (msg["type"] == "Authorization" and
920                     "SMB" in msg["Authorization"]["serviceDescription"] and
921                     msg["Authorization"]["authType"] == "NTLMSSP" and
922                     msg["Authorization"]["transportProtection"] == "SMB")
923
924         creds = self.insta_creds(template=self.get_credentials(),
925                                  kerberos_state=DONT_USE_KERBEROS)
926         self.smb_connection(creds)
927
928         messages = self.waitForMessages(isLastExpectedMessage)
929         self.assertEqual(2,
930                           len(messages),
931                           "Did not receive the expected number of messages")
932         # Check the first message it should be an Authentication
933         msg = messages[0]
934         self.assertEqual("Authentication", msg["type"])
935         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
936         self.assertIn(msg["Authentication"]["serviceDescription"],
937                       ["SMB", "SMB2"])
938         self.assertEqual("NTLMSSP",
939                           msg["Authentication"]["authDescription"])
940         self.assertEqual("NTLMv2",
941                           msg["Authentication"]["passwordType"])
942         self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
943                           msg["Authentication"]["eventId"])
944         self.assertEqual(EVT_LOGON_NETWORK,
945                           msg["Authentication"]["logonType"])
946
947     def test_smb_no_krb_spnego_bad_password(self):
948         def isLastExpectedMessage(msg):
949             return (msg["type"] == "Authentication" and
950                     "SMB" in msg["Authentication"]["serviceDescription"] and
951                     msg["Authentication"]["authDescription"] == "NTLMSSP" and
952                     msg["Authentication"]["passwordType"] == "NTLMv2" and
953                     (msg["Authentication"]["status"] ==
954                         "NT_STATUS_WRONG_PASSWORD") and
955                     (msg["Authentication"]["eventId"] ==
956                         EVT_ID_UNSUCCESSFUL_LOGON) and
957                     (msg["Authentication"]["logonType"] ==
958                         EVT_LOGON_NETWORK))
959
960         creds = self.insta_creds(template=self.get_credentials(),
961                                  kerberos_state=DONT_USE_KERBEROS)
962         creds.set_password("badPassword")
963
964         thrown = False
965         try:
966             self.smb_connection(creds)
967         except NTSTATUSError:
968             thrown = True
969         self.assertEqual(thrown, True)
970
971         messages = self.waitForMessages(isLastExpectedMessage)
972         self.assertEqual(1,
973                           len(messages),
974                           "Did not receive the expected number of messages")
975
976     def test_smb_no_krb_spnego_bad_user(self):
977         def isLastExpectedMessage(msg):
978             return (msg["type"] == "Authentication" and
979                     "SMB" in msg["Authentication"]["serviceDescription"] and
980                     msg["Authentication"]["authDescription"] == "NTLMSSP" and
981                     msg["Authentication"]["passwordType"] == "NTLMv2" and
982                     (msg["Authentication"]["status"] ==
983                         "NT_STATUS_NO_SUCH_USER") and
984                     (msg["Authentication"]["eventId"] ==
985                         EVT_ID_UNSUCCESSFUL_LOGON) and
986                     (msg["Authentication"]["logonType"] ==
987                         EVT_LOGON_NETWORK))
988
989         creds = self.insta_creds(template=self.get_credentials(),
990                                  kerberos_state=DONT_USE_KERBEROS)
991         creds.set_username("badUser")
992
993         thrown = False
994         try:
995             self.smb_connection(creds)
996         except NTSTATUSError:
997             thrown = True
998         self.assertEqual(thrown, True)
999
1000         messages = self.waitForMessages(isLastExpectedMessage)
1001         self.assertEqual(1,
1002                           len(messages),
1003                           "Did not receive the expected number of messages")
1004
1005     def test_smb_no_krb_no_spnego_no_ntlmv2(self):
1006         def isLastExpectedMessage(msg):
1007             return (msg["type"] == "Authorization" and
1008                     msg["Authorization"]["serviceDescription"] == "SMB" and
1009                     msg["Authorization"]["authType"] == "bare-NTLM" and
1010                     msg["Authorization"]["transportProtection"] == "SMB")
1011
1012         creds = self.insta_creds(template=self.get_credentials(),
1013                                  kerberos_state=DONT_USE_KERBEROS)
1014         self.smb_connection(creds,
1015                             force_smb1=True,
1016                             ntlmv2_auth="no",
1017                             use_spnego="no")
1018
1019         messages = self.waitForMessages(isLastExpectedMessage)
1020         self.assertEqual(2,
1021                           len(messages),
1022                           "Did not receive the expected number of messages")
1023         # Check the first message it should be an Authentication
1024         msg = messages[0]
1025         self.assertEqual("Authentication", msg["type"])
1026         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
1027         self.assertEqual("SMB",
1028                           msg["Authentication"]["serviceDescription"])
1029         self.assertEqual("bare-NTLM",
1030                           msg["Authentication"]["authDescription"])
1031         self.assertEqual("NTLMv1",
1032                           msg["Authentication"]["passwordType"])
1033         self.assertEqual(EVT_ID_SUCCESSFUL_LOGON,
1034                           msg["Authentication"]["eventId"])
1035         self.assertEqual(EVT_LOGON_NETWORK,
1036                           msg["Authentication"]["logonType"])
1037
1038     def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
1039         def isLastExpectedMessage(msg):
1040             return (msg["type"] == "Authentication" and
1041                     msg["Authentication"]["serviceDescription"] == "SMB" and
1042                     msg["Authentication"]["authDescription"] == "bare-NTLM" and
1043                     msg["Authentication"]["passwordType"] == "NTLMv1" and
1044                     (msg["Authentication"]["status"] ==
1045                         "NT_STATUS_WRONG_PASSWORD") and
1046                     (msg["Authentication"]["eventId"] ==
1047                         EVT_ID_UNSUCCESSFUL_LOGON) and
1048                     (msg["Authentication"]["logonType"] ==
1049                         EVT_LOGON_NETWORK))
1050
1051         creds = self.insta_creds(template=self.get_credentials(),
1052                                  kerberos_state=DONT_USE_KERBEROS)
1053         creds.set_password("badPassword")
1054
1055         thrown = False
1056         try:
1057             self.smb_connection(creds,
1058                                 force_smb1=True,
1059                                 ntlmv2_auth="no",
1060                                 use_spnego="no")
1061         except NTSTATUSError:
1062             thrown = True
1063         self.assertEqual(thrown, True)
1064
1065         messages = self.waitForMessages(isLastExpectedMessage)
1066         self.assertEqual(1,
1067                           len(messages),
1068                           "Did not receive the expected number of messages")
1069
1070     def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
1071         def isLastExpectedMessage(msg):
1072             return (msg["type"] == "Authentication" and
1073                     msg["Authentication"]["serviceDescription"] == "SMB" and
1074                     msg["Authentication"]["authDescription"] == "bare-NTLM" and
1075                     msg["Authentication"]["passwordType"] == "NTLMv1" and
1076                     (msg["Authentication"]["status"] ==
1077                         "NT_STATUS_NO_SUCH_USER") and
1078                     (msg["Authentication"]["eventId"] ==
1079                         EVT_ID_UNSUCCESSFUL_LOGON) and
1080                     (msg["Authentication"]["logonType"] ==
1081                         EVT_LOGON_NETWORK))
1082
1083         creds = self.insta_creds(template=self.get_credentials(),
1084                                  kerberos_state=DONT_USE_KERBEROS)
1085         creds.set_username("badUser")
1086
1087         thrown = False
1088         try:
1089             self.smb_connection(creds,
1090                                 force_smb1=True,
1091                                 ntlmv2_auth="no",
1092                                 use_spnego="no")
1093         except NTSTATUSError:
1094             thrown = True
1095         self.assertEqual(thrown, True)
1096
1097         messages = self.waitForMessages(isLastExpectedMessage)
1098         self.assertEqual(1,
1099                           len(messages),
1100                           "Did not receive the expected number of messages")
1101
1102     def test_samlogon_interactive(self):
1103
1104         workstation = "AuthLogTests"
1105
1106         def isLastExpectedMessage(msg):
1107             return (msg["type"] == "Authentication" and
1108                     (msg["Authentication"]["serviceDescription"] ==
1109                         "SamLogon") and
1110                     (msg["Authentication"]["authDescription"] ==
1111                         "interactive") and
1112                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1113                     (msg["Authentication"]["workstation"] ==
1114                         r"\\%s" % workstation) and
1115                     (msg["Authentication"]["eventId"] ==
1116                         EVT_ID_SUCCESSFUL_LOGON) and
1117                     (msg["Authentication"]["logonType"] ==
1118                         EVT_LOGON_INTERACTIVE))
1119
1120         server = os.environ["SERVER"]
1121         user = os.environ["USERNAME"]
1122         password = os.environ["PASSWORD"]
1123         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1124
1125         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1126
1127         messages = self.waitForMessages(isLastExpectedMessage)
1128         messages = self.remove_netlogon_messages(messages)
1129         received = len(messages)
1130         self.assertIs(True,
1131                       (received == 4 or received == 5),
1132                       "Did not receive the expected number of messages")
1133
1134     def test_samlogon_interactive_bad_password(self):
1135
1136         workstation = "AuthLogTests"
1137
1138         def isLastExpectedMessage(msg):
1139             return (msg["type"] == "Authentication" and
1140                     (msg["Authentication"]["serviceDescription"] ==
1141                         "SamLogon") and
1142                     (msg["Authentication"]["authDescription"] ==
1143                         "interactive") and
1144                     (msg["Authentication"]["status"] ==
1145                         "NT_STATUS_WRONG_PASSWORD") and
1146                     (msg["Authentication"]["workstation"] ==
1147                         r"\\%s" % workstation) and
1148                     (msg["Authentication"]["eventId"] ==
1149                         EVT_ID_UNSUCCESSFUL_LOGON) and
1150                     (msg["Authentication"]["logonType"] ==
1151                         EVT_LOGON_INTERACTIVE))
1152
1153         server = os.environ["SERVER"]
1154         user = os.environ["USERNAME"]
1155         password = "badPassword"
1156         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1157
1158         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1159
1160         messages = self.waitForMessages(isLastExpectedMessage)
1161         messages = self.remove_netlogon_messages(messages)
1162         received = len(messages)
1163         self.assertIs(True,
1164                       (received == 4 or received == 5),
1165                       "Did not receive the expected number of messages")
1166
1167     def test_samlogon_interactive_bad_user(self):
1168
1169         workstation = "AuthLogTests"
1170
1171         def isLastExpectedMessage(msg):
1172             return (msg["type"] == "Authentication" and
1173                     (msg["Authentication"]["serviceDescription"] ==
1174                         "SamLogon") and
1175                     (msg["Authentication"]["authDescription"] ==
1176                         "interactive") and
1177                     (msg["Authentication"]["status"] ==
1178                         "NT_STATUS_NO_SUCH_USER") and
1179                     (msg["Authentication"]["workstation"] ==
1180                         r"\\%s" % workstation) and
1181                     (msg["Authentication"]["eventId"] ==
1182                         EVT_ID_UNSUCCESSFUL_LOGON) and
1183                     (msg["Authentication"]["logonType"] ==
1184                         EVT_LOGON_INTERACTIVE))
1185
1186         server = os.environ["SERVER"]
1187         user = "badUser"
1188         password = os.environ["PASSWORD"]
1189         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1190
1191         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1192
1193         messages = self.waitForMessages(isLastExpectedMessage)
1194         messages = self.remove_netlogon_messages(messages)
1195         received = len(messages)
1196         self.assertIs(True,
1197                       (received == 4 or received == 5),
1198                       "Did not receive the expected number of messages")
1199
1200     def test_samlogon_network(self):
1201
1202         workstation = "AuthLogTests"
1203
1204         def isLastExpectedMessage(msg):
1205             return (msg["type"] == "Authentication" and
1206                     (msg["Authentication"]["serviceDescription"] ==
1207                         "SamLogon") and
1208                     msg["Authentication"]["authDescription"] == "network" and
1209                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1210                     (msg["Authentication"]["workstation"] ==
1211                         r"\\%s" % workstation) and
1212                     (msg["Authentication"]["eventId"] ==
1213                         EVT_ID_SUCCESSFUL_LOGON) and
1214                     (msg["Authentication"]["logonType"] ==
1215                         EVT_LOGON_NETWORK))
1216
1217         server = os.environ["SERVER"]
1218         user = os.environ["USERNAME"]
1219         password = os.environ["PASSWORD"]
1220         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1221
1222         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1223
1224         messages = self.waitForMessages(isLastExpectedMessage)
1225         messages = self.remove_netlogon_messages(messages)
1226         received = len(messages)
1227         self.assertIs(True,
1228                       (received == 4 or received == 5),
1229                       "Did not receive the expected number of messages")
1230
1231     def test_samlogon_network_bad_password(self):
1232
1233         workstation = "AuthLogTests"
1234
1235         def isLastExpectedMessage(msg):
1236             return (msg["type"] == "Authentication" and
1237                     (msg["Authentication"]["serviceDescription"] ==
1238                         "SamLogon") and
1239                     msg["Authentication"]["authDescription"] == "network" and
1240                     (msg["Authentication"]["status"] ==
1241                         "NT_STATUS_WRONG_PASSWORD") and
1242                     (msg["Authentication"]["workstation"] ==
1243                         r"\\%s" % workstation) and
1244                     (msg["Authentication"]["eventId"] ==
1245                         EVT_ID_UNSUCCESSFUL_LOGON) and
1246                     (msg["Authentication"]["logonType"] ==
1247                         EVT_LOGON_NETWORK))
1248
1249         server = os.environ["SERVER"]
1250         user = os.environ["USERNAME"]
1251         password = "badPassword"
1252         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1253
1254         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1255
1256         messages = self.waitForMessages(isLastExpectedMessage)
1257         messages = self.remove_netlogon_messages(messages)
1258         received = len(messages)
1259         self.assertIs(True,
1260                       (received == 4 or received == 5),
1261                       "Did not receive the expected number of messages")
1262
1263     def test_samlogon_network_bad_user(self):
1264
1265         workstation = "AuthLogTests"
1266
1267         def isLastExpectedMessage(msg):
1268             return ((msg["type"] == "Authentication") and
1269                     (msg["Authentication"]["serviceDescription"] ==
1270                         "SamLogon") and
1271                     (msg["Authentication"]["authDescription"] == "network") and
1272                     (msg["Authentication"]["status"] ==
1273                         "NT_STATUS_NO_SUCH_USER") and
1274                     (msg["Authentication"]["workstation"] ==
1275                         r"\\%s" % workstation) and
1276                     (msg["Authentication"]["eventId"] ==
1277                         EVT_ID_UNSUCCESSFUL_LOGON) and
1278                     (msg["Authentication"]["logonType"] ==
1279                         EVT_LOGON_NETWORK))
1280
1281         server = os.environ["SERVER"]
1282         user = "badUser"
1283         password = os.environ["PASSWORD"]
1284         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1285
1286         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1287
1288         messages = self.waitForMessages(isLastExpectedMessage)
1289         messages = self.remove_netlogon_messages(messages)
1290         received = len(messages)
1291         self.assertIs(True,
1292                       (received == 4 or received == 5),
1293                       "Did not receive the expected number of messages")
1294
1295     def test_samlogon_network_mschap(self):
1296
1297         workstation = "AuthLogTests"
1298
1299         def isLastExpectedMessage(msg):
1300             return ((msg["type"] == "Authentication") and
1301                     (msg["Authentication"]["serviceDescription"] ==
1302                         "SamLogon") and
1303                     (msg["Authentication"]["authDescription"] == "network") and
1304                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1305                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1306                     (msg["Authentication"]["workstation"] ==
1307                         r"\\%s" % workstation) and
1308                     (msg["Authentication"]["eventId"] ==
1309                         EVT_ID_SUCCESSFUL_LOGON) and
1310                     (msg["Authentication"]["logonType"] ==
1311                         EVT_LOGON_NETWORK))
1312
1313         server = os.environ["SERVER"]
1314         user = os.environ["USERNAME"]
1315         password = os.environ["PASSWORD"]
1316         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1317             user, password, workstation, 2)
1318
1319         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1320
1321         messages = self.waitForMessages(isLastExpectedMessage)
1322         messages = self.remove_netlogon_messages(messages)
1323         received = len(messages)
1324         self.assertIs(True,
1325                       (received == 4 or received == 5),
1326                       "Did not receive the expected number of messages")
1327
1328     def test_samlogon_network_mschap_bad_password(self):
1329
1330         workstation = "AuthLogTests"
1331
1332         def isLastExpectedMessage(msg):
1333             return ((msg["type"] == "Authentication") and
1334                     (msg["Authentication"]["serviceDescription"] ==
1335                         "SamLogon") and
1336                     (msg["Authentication"]["authDescription"] == "network") and
1337                     (msg["Authentication"]["status"] ==
1338                         "NT_STATUS_WRONG_PASSWORD") and
1339                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1340                     (msg["Authentication"]["workstation"] ==
1341                         r"\\%s" % workstation) and
1342                     (msg["Authentication"]["eventId"] ==
1343                         EVT_ID_UNSUCCESSFUL_LOGON) and
1344                     (msg["Authentication"]["logonType"] ==
1345                         EVT_LOGON_NETWORK))
1346
1347         server = os.environ["SERVER"]
1348         user = os.environ["USERNAME"]
1349         password = "badPassword"
1350         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1351             user, password, workstation, 2)
1352
1353         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1354
1355         messages = self.waitForMessages(isLastExpectedMessage)
1356         messages = self.remove_netlogon_messages(messages)
1357         received = len(messages)
1358         self.assertIs(True,
1359                       (received == 4 or received == 5),
1360                       "Did not receive the expected number of messages")
1361
1362     def test_samlogon_network_mschap_bad_user(self):
1363
1364         workstation = "AuthLogTests"
1365
1366         def isLastExpectedMessage(msg):
1367             return ((msg["type"] == "Authentication") and
1368                     (msg["Authentication"]["serviceDescription"] ==
1369                         "SamLogon") and
1370                     (msg["Authentication"]["authDescription"] == "network") and
1371                     (msg["Authentication"]["status"] ==
1372                         "NT_STATUS_NO_SUCH_USER") and
1373                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1374                     (msg["Authentication"]["workstation"] ==
1375                         r"\\%s" % workstation) and
1376                     (msg["Authentication"]["eventId"] ==
1377                         EVT_ID_UNSUCCESSFUL_LOGON) and
1378                     (msg["Authentication"]["logonType"] ==
1379                         EVT_LOGON_NETWORK))
1380
1381         server = os.environ["SERVER"]
1382         user = "badUser"
1383         password = os.environ["PASSWORD"]
1384         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1385             user, password, workstation, 2)
1386
1387         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1388
1389         messages = self.waitForMessages(isLastExpectedMessage)
1390         messages = self.remove_netlogon_messages(messages)
1391         received = len(messages)
1392         self.assertIs(True,
1393                       (received == 4 or received == 5),
1394                       "Did not receive the expected number of messages")
1395
1396     def test_samlogon_schannel_seal(self):
1397
1398         workstation = "AuthLogTests"
1399
1400         def isLastExpectedMessage(msg):
1401             return ((msg["type"] == "Authentication") and
1402                     (msg["Authentication"]["serviceDescription"] ==
1403                         "SamLogon") and
1404                     (msg["Authentication"]["authDescription"] == "network") and
1405                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1406                     (msg["Authentication"]["workstation"] ==
1407                         r"\\%s" % workstation) and
1408                     (msg["Authentication"]["eventId"] ==
1409                         EVT_ID_SUCCESSFUL_LOGON) and
1410                     (msg["Authentication"]["logonType"] ==
1411                         EVT_LOGON_NETWORK))
1412
1413         server = os.environ["SERVER"]
1414         user = os.environ["USERNAME"]
1415         password = os.environ["PASSWORD"]
1416         samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1417
1418         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1419
1420         messages = self.waitForMessages(isLastExpectedMessage)
1421         messages = self.remove_netlogon_messages(messages)
1422         received = len(messages)
1423         self.assertIs(True,
1424                       (received == 4 or received == 5),
1425                       "Did not receive the expected number of messages")
1426
1427         # Check the second to last message it should be an Authorization
1428         msg = messages[-2]
1429         self.assertEqual("Authorization", msg["type"])
1430         self.assertEqual("DCE/RPC",
1431                           msg["Authorization"]["serviceDescription"])
1432         self.assertEqual("schannel", msg["Authorization"]["authType"])
1433         self.assertEqual("SEAL", msg["Authorization"]["transportProtection"])
1434         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
1435
1436     # Signed logons get promoted to sealed, this test ensures that
1437     # this behaviour is not removed accidentally
1438     def test_samlogon_schannel_sign(self):
1439
1440         workstation = "AuthLogTests"
1441
1442         def isLastExpectedMessage(msg):
1443             return ((msg["type"] == "Authentication") and
1444                     (msg["Authentication"]["serviceDescription"] ==
1445                         "SamLogon") and
1446                     (msg["Authentication"]["authDescription"] == "network") and
1447                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1448                     (msg["Authentication"]["workstation"] ==
1449                         r"\\%s" % workstation) and
1450                     (msg["Authentication"]["eventId"] ==
1451                         EVT_ID_SUCCESSFUL_LOGON) and
1452                     (msg["Authentication"]["logonType"] ==
1453                         EVT_LOGON_NETWORK))
1454
1455         server = os.environ["SERVER"]
1456         user = os.environ["USERNAME"]
1457         password = os.environ["PASSWORD"]
1458         samlogon = "schannelsign;samlogon %s %s %s" % (
1459             user, password, workstation)
1460
1461         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1462
1463         messages = self.waitForMessages(isLastExpectedMessage)
1464         messages = self.remove_netlogon_messages(messages)
1465         received = len(messages)
1466         self.assertIs(True,
1467                       (received == 4 or received == 5),
1468                       "Did not receive the expected number of messages")
1469
1470         # Check the second to last message it should be an Authorization
1471         msg = messages[-2]
1472         self.assertEqual("Authorization", msg["type"])
1473         self.assertEqual("DCE/RPC",
1474                           msg["Authorization"]["serviceDescription"])
1475         self.assertEqual("schannel", msg["Authorization"]["authType"])
1476         self.assertEqual("SEAL", msg["Authorization"]["transportProtection"])
1477         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))