tests auth_log: Modify existing tests to handle NETLOGON messages
[vlendec/samba-autobuild/.git] / python / samba / tests / auth_log_samlogon.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """
19     Tests auth logging tests that exercise SamLogon
20 """
21
22 from samba import auth
23 import samba.tests
24 from samba.messaging import Messaging
25 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
26 import time
27 import json
28 import os
29 from samba import smb
30 from samba.samdb import SamDB
31 import samba.tests.auth_log_base
32 from samba.credentials import (
33     Credentials,
34     DONT_USE_KERBEROS,
35     CLI_CRED_NTLMv2_AUTH
36 )
37 from samba.dcerpc import ntlmssp, netlogon
38 from samba.dcerpc.dcerpc import AS_SYSTEM_MAGIC_PATH_TOKEN
39 from samba.ndr import ndr_pack
40 from samba.auth import system_session
41 from samba.tests import delete_force
42 from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD
43 from samba.dcerpc.misc import SEC_CHAN_WKSTA
44
45 class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase):
46
47     def setUp(self):
48         super(AuthLogTestsSamLogon, self).setUp()
49         self.lp      = samba.tests.env_loadparm()
50         self.creds   = Credentials()
51
52         self.session = system_session()
53         self.ldb = SamDB(
54             session_info=self.session,
55             credentials=self.creds,
56             lp=self.lp)
57
58         self.domain        = os.environ["DOMAIN"]
59         self.netbios_name  = "SamLogonTest"
60         self.machinepass   = "abcdefghij"
61         self.remoteAddress = AS_SYSTEM_MAGIC_PATH_TOKEN
62         self.base_dn       = self.ldb.domain_dn()
63         self.samlogon_dn   = ("cn=%s,cn=users,%s" %
64                               (self.netbios_name, self.base_dn))
65
66
67     def tearDown(self):
68         super(AuthLogTestsSamLogon , self).tearDown()
69         delete_force(self.ldb, self.samlogon_dn)
70
71     def _test_samlogon(self, binding, creds, checkFunction):
72
73         def isLastExpectedMessage(msg):
74             return (
75                 msg["type"] == "Authentication" and
76                 msg["Authentication"]["serviceDescription"]  == "SamLogon" and
77                 msg["Authentication"]["authDescription"]     == "network" and
78                 msg["Authentication"]["passwordType"]        == "NTLMv2")
79
80         if binding:
81             binding = "[schannel,%s]" % binding
82         else:
83             binding = "[schannel]"
84
85         utf16pw = unicode(
86             '"' + self.machinepass.encode('utf-8') + '"', 'utf-8'
87         ).encode('utf-16-le')
88         self.ldb.add({
89             "dn": self.samlogon_dn,
90             "objectclass": "computer",
91             "sAMAccountName": "%s$" % self.netbios_name,
92             "userAccountControl":
93                 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
94             "unicodePwd": utf16pw})
95
96         machine_creds = Credentials()
97         machine_creds.guess(self.get_loadparm())
98         machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
99         machine_creds.set_password(self.machinepass)
100         machine_creds.set_username(self.netbios_name + "$")
101
102         netlogon_conn = netlogon.netlogon("ncalrpc:%s" % binding,
103                                           self.get_loadparm(),
104                                           machine_creds)
105         challenge = b"abcdefgh"
106
107         target_info = ntlmssp.AV_PAIR_LIST()
108         target_info.count = 3
109
110         domainname = ntlmssp.AV_PAIR()
111         domainname.AvId = ntlmssp.MsvAvNbDomainName
112         domainname.Value = self.domain
113
114         computername = ntlmssp.AV_PAIR()
115         computername.AvId = ntlmssp.MsvAvNbComputerName
116         computername.Value = self.netbios_name
117
118         eol = ntlmssp.AV_PAIR()
119         eol.AvId = ntlmssp.MsvAvEOL
120         target_info.pair = [domainname, computername, eol]
121
122
123         target_info_blob = ndr_pack(target_info)
124
125         response = creds.get_ntlm_response(flags=CLI_CRED_NTLMv2_AUTH,
126                                            challenge=challenge,
127                                            target_info=target_info_blob)
128
129         netr_flags = 0
130
131         logon_level = netlogon.NetlogonNetworkTransitiveInformation
132         logon = samba.dcerpc.netlogon.netr_NetworkInfo()
133
134         logon.challenge = [ord(x) for x in challenge]
135         logon.nt = netlogon.netr_ChallengeResponse()
136         logon.nt.length = len(response["nt_response"])
137         logon.nt.data = [ord(x) for x in response["nt_response"]]
138         logon.identity_info = samba.dcerpc.netlogon.netr_IdentityInfo()
139         (username, domain) = creds.get_ntlm_username_domain()
140
141         logon.identity_info.domain_name.string = domain
142         logon.identity_info.account_name.string = username
143         logon.identity_info.workstation.string = creds.get_workstation()
144
145         validation_level = samba.dcerpc.netlogon.NetlogonValidationSamInfo4
146
147
148         result = netlogon_conn.netr_LogonSamLogonEx(os.environ["SERVER"],
149                                                machine_creds.get_workstation(),
150                                                logon_level, logon,
151                                                validation_level, netr_flags)
152
153         (validation, authoritative, netr_flags_out) = result
154
155
156         messages = self.waitForMessages(isLastExpectedMessage, netlogon_conn)
157         checkFunction(messages)
158
159     def samlogon_check(self, messages):
160
161         messages = self.remove_netlogon_messages(messages)
162         expected_messages = 5
163         self.assertEquals(expected_messages,
164                           len(messages),
165                           "Did not receive the expected number of messages")
166
167         # Check the first message it should be an Authorization
168         msg = messages[0]
169         self.assertEquals("Authorization", msg["type"])
170         self.assertEquals("DCE/RPC",
171                           msg["Authorization"]["serviceDescription"])
172         self.assertEquals("ncalrpc", msg["Authorization"]["authType"])
173         self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
174
175
176     def test_ncalrpc_samlogon(self):
177
178         creds = self.insta_creds(template=self.get_credentials(),
179                                  kerberos_state=DONT_USE_KERBEROS)
180         try:
181             self._test_samlogon("SEAL", creds, self.samlogon_check)
182         except Exception as e:
183             self.fail("Unexpected exception: " + str(e))