8e86c646a008909202405df742fadf0e0cea9435
[samba.git] / python / samba / tests / auth_log_samlogon.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """
19     Tests auth logging tests that exercise SamLogon
20 """
21
22 import samba.tests
23 import os
24 from samba.samdb import SamDB
25 import samba.tests.auth_log_base
26 from samba.credentials import (
27     Credentials,
28     DONT_USE_KERBEROS,
29     CLI_CRED_NTLMv2_AUTH
30 )
31 from samba.dcerpc import ntlmssp, netlogon
32 from samba.dcerpc.dcerpc import AS_SYSTEM_MAGIC_PATH_TOKEN
33 from samba.ndr import ndr_pack
34 from samba.auth import system_session
35 from samba.tests import delete_force
36 from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD
37 from samba.dcerpc.misc import SEC_CHAN_WKSTA
38 from samba.dcerpc.windows_event_ids import (
39     EVT_ID_SUCCESSFUL_LOGON,
40     EVT_LOGON_NETWORK
41 )
42
43
44 class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase):
45
46     def setUp(self):
47         super(AuthLogTestsSamLogon, self).setUp()
48         self.lp = samba.tests.env_loadparm()
49         self.session = system_session()
50         self.ldb = SamDB(
51             session_info=self.session,
52             lp=self.lp)
53
54         self.domain = os.environ["DOMAIN"]
55         self.netbios_name = "SamLogonTest"
56         self.machinepass = "abcdefghij"
57         self.remoteAddress = AS_SYSTEM_MAGIC_PATH_TOKEN
58         self.base_dn = self.ldb.domain_dn()
59         self.samlogon_dn = ("cn=%s,cn=users,%s" %
60                            (self.netbios_name, self.base_dn))
61
62     def tearDown(self):
63         super(AuthLogTestsSamLogon, self).tearDown()
64         delete_force(self.ldb, self.samlogon_dn)
65
66     def _test_samlogon(self, binding, creds, checkFunction):
67
68         def isLastExpectedMessage(msg):
69             return (
70                 msg["type"] == "Authentication" and
71                 msg["Authentication"]["serviceDescription"] == "SamLogon" and
72                 msg["Authentication"]["authDescription"] == "network" and
73                 msg["Authentication"]["passwordType"] == "NTLMv2" and
74                 (msg["Authentication"]["eventId"] ==
75                     EVT_ID_SUCCESSFUL_LOGON) and
76                 (msg["Authentication"]["logonType"] == EVT_LOGON_NETWORK))
77
78         if binding:
79             binding = "[schannel,%s]" % binding
80         else:
81             binding = "[schannel]"
82
83         utf16pw = ('"' + self.machinepass + '"').encode('utf-16-le')
84         self.ldb.add({
85             "dn": self.samlogon_dn,
86             "objectclass": "computer",
87             "sAMAccountName": "%s$" % self.netbios_name,
88             "userAccountControl":
89                 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
90             "unicodePwd": utf16pw})
91
92         machine_creds = Credentials()
93         machine_creds.guess(self.get_loadparm())
94         machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
95         machine_creds.set_password(self.machinepass)
96         machine_creds.set_username(self.netbios_name + "$")
97
98         netlogon_conn = netlogon.netlogon("ncalrpc:%s" % binding,
99                                           self.get_loadparm(),
100                                           machine_creds)
101         challenge = b"abcdefgh"
102
103         target_info = ntlmssp.AV_PAIR_LIST()
104         target_info.count = 3
105
106         domainname = ntlmssp.AV_PAIR()
107         domainname.AvId = ntlmssp.MsvAvNbDomainName
108         domainname.Value = self.domain
109
110         computername = ntlmssp.AV_PAIR()
111         computername.AvId = ntlmssp.MsvAvNbComputerName
112         computername.Value = self.netbios_name
113
114         eol = ntlmssp.AV_PAIR()
115         eol.AvId = ntlmssp.MsvAvEOL
116         target_info.pair = [domainname, computername, eol]
117
118         target_info_blob = ndr_pack(target_info)
119
120         response = creds.get_ntlm_response(flags=CLI_CRED_NTLMv2_AUTH,
121                                            challenge=challenge,
122                                            target_info=target_info_blob)
123
124         netr_flags = 0
125
126         logon_level = netlogon.NetlogonNetworkTransitiveInformation
127         logon = samba.dcerpc.netlogon.netr_NetworkInfo()
128
129         logon.challenge = [
130             x if isinstance(x, int) else ord(x) for x in challenge]
131         logon.nt = netlogon.netr_ChallengeResponse()
132         logon.nt.length = len(response["nt_response"])
133         logon.nt.data = [
134             x if isinstance(x, int) else ord(x) for
135             x in response["nt_response"]
136         ]
137         logon.identity_info = samba.dcerpc.netlogon.netr_IdentityInfo()
138         (username, domain) = creds.get_ntlm_username_domain()
139
140         logon.identity_info.domain_name.string = domain
141         logon.identity_info.account_name.string = username
142         logon.identity_info.workstation.string = creds.get_workstation()
143
144         validation_level = samba.dcerpc.netlogon.NetlogonValidationSamInfo4
145
146         result = netlogon_conn.netr_LogonSamLogonEx(
147             os.environ["SERVER"],
148             machine_creds.get_workstation(),
149             logon_level, logon,
150             validation_level, netr_flags)
151
152         (validation, authoritative, netr_flags_out) = result
153
154         messages = self.waitForMessages(isLastExpectedMessage, netlogon_conn)
155         checkFunction(messages)
156
157     def samlogon_check(self, messages):
158
159         messages = self.remove_netlogon_messages(messages)
160         expected_messages = 5
161         self.assertEqual(expected_messages,
162                           len(messages),
163                           "Did not receive the expected number of messages")
164
165         # Check the first message it should be an Authorization
166         msg = messages[0]
167         self.assertEqual("Authorization", msg["type"])
168         self.assertEqual("DCE/RPC",
169                           msg["Authorization"]["serviceDescription"])
170         self.assertEqual("ncalrpc", msg["Authorization"]["authType"])
171         self.assertEqual("NONE", msg["Authorization"]["transportProtection"])
172         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
173
174     def test_ncalrpc_samlogon(self):
175
176         creds = self.insta_creds(template=self.get_credentials(),
177                                  kerberos_state=DONT_USE_KERBEROS)
178         try:
179             self._test_samlogon("SEAL", creds, self.samlogon_check)
180         except Exception as e:
181             self.fail("Unexpected exception: " + str(e))