1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2009
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 Note that this just tests the bindings work. It does not intend to test
21 the functionality, that's already done in other tests.
24 from samba.credentials import Credentials
25 from samba import gensec, auth
29 class GensecTests(samba.tests.TestCase):
32 super(GensecTests, self).setUp()
34 self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
35 self.settings["target_hostname"] = self.lp_ctx.get("netbios name")
36 self.lp_ctx.set("spnego:simulate_w2k", "no")
38 """This is just for the API tests"""
39 self.gensec = gensec.Security.start_client(self.settings)
41 def test_start_mech_by_unknown_name(self):
42 self.assertRaises(RuntimeError, self.gensec.start_mech_by_name, "foo")
44 def test_start_mech_by_name(self):
45 self.gensec.start_mech_by_name("spnego")
47 def test_info_uninitialized(self):
48 self.assertRaises(RuntimeError, self.gensec.session_info)
50 def _test_update(self, mech, client_mech=None):
51 """Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC"""
53 """Start up a client and server GENSEC instance to test things with"""
55 self.gensec_client = gensec.Security.start_client(self.settings)
56 self.gensec_client.set_credentials(self.get_credentials())
57 self.gensec_client.want_feature(gensec.FEATURE_SEAL)
58 if client_mech is not None:
59 self.gensec_client.start_mech_by_name(client_mech)
61 self.gensec_client.start_mech_by_sasl_name(mech)
63 self.gensec_server = gensec.Security.start_server(settings=self.settings,
64 auth_context=auth.AuthContext(lp_ctx=self.lp_ctx))
66 creds.guess(self.lp_ctx)
67 creds.set_machine_account(self.lp_ctx)
68 self.gensec_server.set_credentials(creds)
70 self.gensec_server.want_feature(gensec.FEATURE_SEAL)
71 self.gensec_server.start_mech_by_sasl_name(mech)
73 client_finished = False
74 server_finished = False
75 server_to_client = b""
76 client_to_server = b""
78 """Run the actual call loop"""
80 if not client_finished:
81 print("running client gensec_update")
82 (client_finished, client_to_server) = self.gensec_client.update(server_to_client)
83 if not server_finished:
84 print("running server gensec_update")
85 (server_finished, server_to_client) = self.gensec_server.update(client_to_server)
87 if client_finished and server_finished:
90 self.assertTrue(server_finished)
91 self.assertTrue(client_finished)
93 session_info = self.gensec_server.session_info()
95 test_bytes = b"Hello Server"
97 test_wrapped = self.gensec_client.wrap(test_bytes)
98 test_unwrapped = self.gensec_server.unwrap(test_wrapped)
99 except samba.NTSTATUSError as e:
102 self.assertEqual(test_bytes, test_unwrapped)
103 test_bytes = b"Hello Client"
104 test_wrapped = self.gensec_server.wrap(test_bytes)
105 test_unwrapped = self.gensec_client.unwrap(test_wrapped)
106 self.assertEqual(test_bytes, test_unwrapped)
108 client_session_key = self.gensec_client.session_key()
109 server_session_key = self.gensec_server.session_key()
110 self.assertEqual(client_session_key, server_session_key)
112 def test_update(self):
113 self._test_update("GSSAPI")
115 def test_update_spnego(self):
116 self._test_update("GSS-SPNEGO")
118 def test_update_w2k_spnego_client(self):
119 self.lp_ctx.set("spnego:simulate_w2k", "yes")
121 # Re-start the client with this set
122 self.gensec = gensec.Security.start_client(self.settings)
124 # Unset it for the server
125 self.lp_ctx.set("spnego:simulate_w2k", "no")
127 self._test_update("GSS-SPNEGO")
129 def test_update_w2k_spnego_server(self):
130 # Re-start the client with this set
131 self.gensec = gensec.Security.start_client(self.settings)
133 # Unset it for the server
134 self.lp_ctx.set("spnego:simulate_w2k", "yes")
136 self._test_update("GSS-SPNEGO")
138 def test_update_w2k_spnego(self):
139 self.lp_ctx.set("spnego:simulate_w2k", "no")
141 # Re-start the client with this set
142 self.gensec = gensec.Security.start_client(self.settings)
144 self._test_update("GSS-SPNEGO")
146 def test_update_gss_krb5_to_spnego(self):
147 self._test_update("GSS-SPNEGO", "gssapi_krb5")
149 def test_update_ntlmssp_to_spnego(self):
150 self._test_update("GSS-SPNEGO", "ntlmssp")
152 def test_max_update_size(self):
153 """Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC"""
155 """Start up a client and server GENSEC instance to test things with"""
157 self.gensec_client = gensec.Security.start_client(self.settings)
158 self.gensec_client.set_credentials(self.get_credentials())
159 self.gensec_client.want_feature(gensec.FEATURE_SIGN)
160 self.gensec_client.set_max_update_size(5)
161 self.gensec_client.start_mech_by_name("spnego")
163 self.gensec_server = gensec.Security.start_server(settings=self.settings,
164 auth_context=auth.AuthContext(lp_ctx=self.lp_ctx))
165 creds = Credentials()
166 creds.guess(self.lp_ctx)
167 creds.set_machine_account(self.lp_ctx)
168 self.gensec_server.set_credentials(creds)
169 self.gensec_server.want_feature(gensec.FEATURE_SIGN)
170 self.gensec_server.set_max_update_size(5)
171 self.gensec_server.start_mech_by_name("spnego")
173 client_finished = False
174 server_finished = False
175 server_to_client = b""
177 """Run the actual call loop"""
179 while not client_finished or not server_finished:
181 if not client_finished:
182 print("running client gensec_update: %d: %r" % (len(server_to_client), server_to_client))
183 (client_finished, client_to_server) = self.gensec_client.update(server_to_client)
184 if not server_finished:
185 print("running server gensec_update: %d: %r" % (len(client_to_server), client_to_server))
186 (server_finished, server_to_client) = self.gensec_server.update(client_to_server)
188 """Here we expect a lot more than the typical 1 or 2 roundtrips"""
189 self.assertTrue(i > 10)
191 session_info = self.gensec_server.session_info()
193 test_bytes = b"Hello Server"
194 test_wrapped = self.gensec_client.wrap(test_bytes)
195 test_unwrapped = self.gensec_server.unwrap(test_wrapped)
196 self.assertEqual(test_bytes, test_unwrapped)
197 test_bytes = b"Hello Client"
198 test_wrapped = self.gensec_server.wrap(test_bytes)
199 test_unwrapped = self.gensec_client.unwrap(test_wrapped)
200 self.assertEqual(test_bytes, test_unwrapped)
202 client_session_key = self.gensec_client.session_key()
203 server_session_key = self.gensec_server.session_key()
204 self.assertEqual(client_session_key, server_session_key)