1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2009
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 Note that this just tests the bindings work. It does not intend to test
21 the functionality, that's already done in other tests.
24 from samba.credentials import Credentials
25 from samba import gensec, auth
28 class GensecTests(samba.tests.TestCase):
31 super(GensecTests, self).setUp()
33 self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
34 self.settings["target_hostname"] = self.lp_ctx.get("netbios name")
35 self.lp_ctx.set("spnego:simulate_w2k", "no")
37 """This is just for the API tests"""
38 self.gensec = gensec.Security.start_client(self.settings)
40 def test_start_mech_by_unknown_name(self):
41 self.assertRaises(RuntimeError, self.gensec.start_mech_by_name, "foo")
43 def test_start_mech_by_name(self):
44 self.gensec.start_mech_by_name("spnego")
46 def test_info_uninitialized(self):
47 self.assertRaises(RuntimeError, self.gensec.session_info)
49 def _test_update(self, mech, client_mech=None):
50 """Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC"""
52 """Start up a client and server GENSEC instance to test things with"""
54 self.gensec_client = gensec.Security.start_client(self.settings)
55 self.gensec_client.set_credentials(self.get_credentials())
56 self.gensec_client.want_feature(gensec.FEATURE_SEAL)
57 if client_mech is not None:
58 self.gensec_client.start_mech_by_name(client_mech)
60 self.gensec_client.start_mech_by_sasl_name(mech)
62 self.gensec_server = gensec.Security.start_server(settings=self.settings,
63 auth_context=auth.AuthContext(lp_ctx=self.lp_ctx))
65 creds.guess(self.lp_ctx)
66 creds.set_machine_account(self.lp_ctx)
67 self.gensec_server.set_credentials(creds)
69 self.gensec_server.want_feature(gensec.FEATURE_SEAL)
70 self.gensec_server.start_mech_by_sasl_name(mech)
72 client_finished = False
73 server_finished = False
74 server_to_client = b""
75 client_to_server = b""
77 """Run the actual call loop"""
79 if not client_finished:
80 print("running client gensec_update")
81 (client_finished, client_to_server) = self.gensec_client.update(server_to_client)
82 if not server_finished:
83 print("running server gensec_update")
84 (server_finished, server_to_client) = self.gensec_server.update(client_to_server)
86 if client_finished and server_finished:
89 self.assertTrue(server_finished)
90 self.assertTrue(client_finished)
92 session_info = self.gensec_server.session_info()
94 test_bytes = b"Hello Server"
96 test_wrapped = self.gensec_client.wrap(test_bytes)
97 test_unwrapped = self.gensec_server.unwrap(test_wrapped)
98 except samba.NTSTATUSError as e:
101 self.assertEqual(test_bytes, test_unwrapped)
102 test_bytes = b"Hello Client"
103 test_wrapped = self.gensec_server.wrap(test_bytes)
104 test_unwrapped = self.gensec_client.unwrap(test_wrapped)
105 self.assertEqual(test_bytes, test_unwrapped)
107 client_session_key = self.gensec_client.session_key()
108 server_session_key = self.gensec_server.session_key()
109 self.assertEqual(client_session_key, server_session_key)
111 def test_update(self):
112 self._test_update("GSSAPI")
114 def test_update_spnego(self):
115 self._test_update("GSS-SPNEGO")
117 def test_update_w2k_spnego_client(self):
118 self.lp_ctx.set("spnego:simulate_w2k", "yes")
120 # Re-start the client with this set
121 self.gensec = gensec.Security.start_client(self.settings)
123 # Unset it for the server
124 self.lp_ctx.set("spnego:simulate_w2k", "no")
126 self._test_update("GSS-SPNEGO")
128 def test_update_w2k_spnego_server(self):
129 # Re-start the client with this set
130 self.gensec = gensec.Security.start_client(self.settings)
132 # Unset it for the server
133 self.lp_ctx.set("spnego:simulate_w2k", "yes")
135 self._test_update("GSS-SPNEGO")
137 def test_update_w2k_spnego(self):
138 self.lp_ctx.set("spnego:simulate_w2k", "no")
140 # Re-start the client with this set
141 self.gensec = gensec.Security.start_client(self.settings)
143 self._test_update("GSS-SPNEGO")
145 def test_update_gss_krb5_to_spnego(self):
146 self._test_update("GSS-SPNEGO", "gssapi_krb5")
148 def test_update_ntlmssp_to_spnego(self):
149 self._test_update("GSS-SPNEGO", "ntlmssp")
152 def test_max_update_size(self):
153 """Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC"""
155 """Start up a client and server GENSEC instance to test things with"""
157 self.gensec_client = gensec.Security.start_client(self.settings)
158 self.gensec_client.set_credentials(self.get_credentials())
159 self.gensec_client.want_feature(gensec.FEATURE_SIGN)
160 self.gensec_client.set_max_update_size(5)
161 self.gensec_client.start_mech_by_name("spnego")
163 self.gensec_server = gensec.Security.start_server(settings=self.settings,
164 auth_context=auth.AuthContext(lp_ctx=self.lp_ctx))
165 creds = Credentials()
166 creds.guess(self.lp_ctx)
167 creds.set_machine_account(self.lp_ctx)
168 self.gensec_server.set_credentials(creds)
169 self.gensec_server.want_feature(gensec.FEATURE_SIGN)
170 self.gensec_server.set_max_update_size(5)
171 self.gensec_server.start_mech_by_name("spnego")
173 client_finished = False
174 server_finished = False
175 server_to_client = b""
177 """Run the actual call loop"""
179 while not client_finished or not server_finished:
181 if not client_finished:
182 print("running client gensec_update: %d: %r" % (len(server_to_client), server_to_client))
183 (client_finished, client_to_server) = self.gensec_client.update(server_to_client)
184 if not server_finished:
185 print("running server gensec_update: %d: %r" % (len(client_to_server), client_to_server))
186 (server_finished, server_to_client) = self.gensec_server.update(client_to_server)
188 """Here we expect a lot more than the typical 1 or 2 roundtrips"""
189 self.assertTrue(i > 10)
191 session_info = self.gensec_server.session_info()
193 test_bytes = b"Hello Server"
194 test_wrapped = self.gensec_client.wrap(test_bytes)
195 test_unwrapped = self.gensec_server.unwrap(test_wrapped)
196 self.assertEqual(test_bytes, test_unwrapped)
197 test_bytes = b"Hello Client"
198 test_wrapped = self.gensec_server.wrap(test_bytes)
199 test_unwrapped = self.gensec_client.unwrap(test_wrapped)
200 self.assertEqual(test_bytes, test_unwrapped)
202 client_session_key = self.gensec_client.session_key()
203 server_session_key = self.gensec_server.session_key()
204 self.assertEqual(client_session_key, server_session_key)