traffic: new version of model with packet_rate, version number
[samba.git] / python / samba / tests / auth_log_netlogon.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 # Copyright (C) Catalyst IT Ltd. 2017
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 """
20     Tests that exercise the auth logging for a successful netlogon attempt
21
22     NOTE: As the netlogon authentication is performed once per session,
23           there is only one test in this routine.  If another test is added
24           only the test executed first will generate the netlogon auth message
25 """
26
27 import samba.tests
28 import os
29 from samba.samdb import SamDB
30 import samba.tests.auth_log_base
31 from samba.credentials import Credentials
32 from samba.dcerpc import netlogon
33 from samba.dcerpc.dcerpc import AS_SYSTEM_MAGIC_PATH_TOKEN
34 from samba.auth import system_session
35 from samba.tests import delete_force
36 from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD
37 from samba.dcerpc.misc import SEC_CHAN_WKSTA
38 from samba.compat import text_type
39 from samba.dcerpc.windows_event_ids import (
40     EVT_ID_SUCCESSFUL_LOGON,
41     EVT_LOGON_NETWORK
42 )
43
44
45 class AuthLogTestsNetLogon(samba.tests.auth_log_base.AuthLogTestBase):
46
47     def setUp(self):
48         super(AuthLogTestsNetLogon, self).setUp()
49         self.lp = samba.tests.env_loadparm()
50         self.creds = Credentials()
51
52         self.session = system_session()
53         self.ldb = SamDB(
54             session_info=self.session,
55             credentials=self.creds,
56             lp=self.lp)
57
58         self.domain = os.environ["DOMAIN"]
59         self.netbios_name = "NetLogonGood"
60         self.machinepass = "abcdefghij"
61         self.remoteAddress = AS_SYSTEM_MAGIC_PATH_TOKEN
62         self.base_dn = self.ldb.domain_dn()
63         self.dn = ("cn=%s,cn=users,%s" % (self.netbios_name, self.base_dn))
64
65         utf16pw = text_type('"' + self.machinepass + '"').encode('utf-16-le')
66         self.ldb.add({
67             "dn": self.dn,
68             "objectclass": "computer",
69             "sAMAccountName": "%s$" % self.netbios_name,
70             "userAccountControl":
71                 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
72             "unicodePwd": utf16pw})
73
74     def tearDown(self):
75         super(AuthLogTestsNetLogon, self).tearDown()
76         delete_force(self.ldb, self.dn)
77
78     def _test_netlogon(self, binding, checkFunction):
79
80         def isLastExpectedMessage(msg):
81             return (
82                 msg["type"] == "Authorization" and
83                 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
84                 msg["Authorization"]["authType"] == "schannel" and
85                 msg["Authorization"]["transportProtection"] == "SEAL")
86
87         if binding:
88             binding = "[schannel,%s]" % binding
89         else:
90             binding = "[schannel]"
91
92         machine_creds = Credentials()
93         machine_creds.guess(self.get_loadparm())
94         machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
95         machine_creds.set_password(self.machinepass)
96         machine_creds.set_username(self.netbios_name + "$")
97
98         netlogon_conn = netlogon.netlogon("ncalrpc:%s" % binding,
99                                           self.get_loadparm(),
100                                           machine_creds)
101
102         messages = self.waitForMessages(isLastExpectedMessage, netlogon_conn)
103         checkFunction(messages)
104
105     def netlogon_check(self, messages):
106
107         expected_messages = 5
108         self.assertEquals(expected_messages,
109                           len(messages),
110                           "Did not receive the expected number of messages")
111
112         # Check the first message it should be an Authorization
113         msg = messages[0]
114         self.assertEquals("Authorization", msg["type"])
115         self.assertEquals("DCE/RPC",
116                           msg["Authorization"]["serviceDescription"])
117         self.assertEquals("ncalrpc", msg["Authorization"]["authType"])
118         self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
119         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
120
121         # Check the fourth message it should be a NETLOGON Authentication
122         msg = messages[3]
123         self.assertEquals("Authentication", msg["type"])
124         self.assertEquals("NETLOGON",
125                           msg["Authentication"]["serviceDescription"])
126         self.assertEquals("ServerAuthenticate",
127                           msg["Authentication"]["authDescription"])
128         self.assertEquals("NT_STATUS_OK",
129                           msg["Authentication"]["status"])
130         self.assertEquals("HMAC-SHA256",
131                           msg["Authentication"]["passwordType"])
132         self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
133                           msg["Authentication"]["eventId"])
134         self.assertEquals(EVT_LOGON_NETWORK,
135                           msg["Authentication"]["logonType"])
136
137     def test_netlogon(self):
138         self._test_netlogon("SEAL", self.netlogon_check)