1 # Unix SMB/CIFS implementation.
2 # A test for the ntlm_auth tool
3 # Copyright (C) Kai Blin <kai@samba.org> 2008
4 # Copyright (C) Samuel Cabrero <scabrero@suse.de> 2018
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 This test program will start ntlm_auth with the given command line switches and
21 see if it will get the expected results.
27 from samba.tests import BlackboxTestCase
29 class NTLMAuthTestCase(BlackboxTestCase):
33 bindir = os.path.normpath(os.getenv("BINDIR", "./bin"))
34 self.ntlm_auth_path = os.path.join(bindir, 'ntlm_auth')
35 self.lp = samba.tests.env_loadparm()
36 self.winbind_separator = self.lp.get('winbind separator')
38 def readLine(self, text_stream):
39 buf = text_stream.readline()
40 newline = buf.find('\n')
42 raise Exception("Failed to read line")
45 def writeLine(self, text_stream, buf):
46 text_stream.write(buf)
47 text_stream.write("\n")
53 client_use_cached_creds=False,
57 client_helper="ntlmssp-client-1",
58 server_helper="squid-2.5-ntlmssp",
59 server_use_winbind=False,
60 require_membership=None,
63 self.assertTrue(os.access(self.ntlm_auth_path, os.X_OK))
65 if client_username is None:
66 raise Exception("client_username required")
70 client_args.append(self.ntlm_auth_path)
71 client_args.append("--helper-protocol=%s" % client_helper)
72 client_args.append("--username=%s" % client_username)
74 client_args.append("--domain=%s" % client_domain)
75 if client_use_cached_creds:
76 client_args.append("--use-cached-creds")
78 if client_password is None:
79 raise Exception("client_password required")
80 client_args.append("--password=%s" % client_password)
82 client_args.append("--target-service=%s" % target_service)
84 client_args.append("--target-hostname=%s" % target_hostname)
85 client_args.append("--configfile=%s" % self.lp.configfile)
89 server_args.append(self.ntlm_auth_path)
90 server_args.append("--helper-protocol=%s" % server_helper)
91 server_args.append("--configfile=%s" % self.lp.configfile)
92 if not server_use_winbind:
93 if server_username is None or server_password is None or server_domain is None:
94 raise Exception("Server credentials required if not using winbind")
95 server_args.append("--username=%s" % server_username)
96 server_args.append("--password=%s" % server_password)
97 server_args.append("--domain=%s" % server_domain)
98 if require_membership is not None:
99 raise Exception("Server must be using winbind for require-membership-of")
101 if require_membership is not None:
102 server_args.append("--require-membership-of=%s" % require_membership)
106 server_proc = subprocess.Popen(server_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0, universal_newlines=True)
107 client_proc = subprocess.Popen(client_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0, universal_newlines=True)
110 if client_helper == "ntlmssp-client-1" and server_helper == "squid-2.5-ntlmssp":
111 self.writeLine(client_proc.stdin, "YR")
112 buf = self.readLine(client_proc.stdout)
113 self.assertTrue(buf.startswith("YR "))
115 self.writeLine(server_proc.stdin, buf)
116 buf = self.readLine(server_proc.stdout)
117 self.assertTrue(buf.startswith("TT "))
119 self.writeLine(client_proc.stdin, buf)
120 buf = self.readLine(client_proc.stdout)
121 self.assertTrue(buf.startswith("AF "))
123 # Client sends 'AF <base64 blob>' but server
124 # expects 'KK <base64 blob>'
125 buf = buf.replace("AF", "KK", 1)
127 self.writeLine(server_proc.stdin, buf)
128 buf = self.readLine(server_proc.stdout)
129 result = buf.startswith("AF ")
130 elif client_helper == "ntlmssp-client-1" and server_helper == "gss-spnego":
131 self.writeLine(client_proc.stdin, "YR")
132 buf = self.readLine(client_proc.stdout)
133 self.assertTrue(buf.startswith("YR "))
135 self.writeLine(server_proc.stdin, buf)
136 buf = self.readLine(server_proc.stdout)
137 self.assertTrue(buf.startswith("TT "))
139 self.writeLine(client_proc.stdin, buf)
140 buf = self.readLine(client_proc.stdout)
141 self.assertTrue(buf.startswith("AF "))
143 # Client sends 'AF <base64 blob>' but server expects 'KK <abse64 blob>'
144 buf = buf.replace("AF", "KK", 1)
146 self.writeLine(server_proc.stdin, buf)
147 buf = self.readLine(server_proc.stdout)
148 result = buf.startswith("AF * ")
149 elif client_helper == "gss-spnego-client" and server_helper == "gss-spnego":
150 self.writeLine(server_proc.stdin, "YR")
151 buf = self.readLine(server_proc.stdout)
154 if (buf.startswith("NA * ")):
158 self.assertTrue(buf.startswith("AF ") or buf.startswith("TT "))
160 self.writeLine(client_proc.stdin, buf)
161 buf = self.readLine(client_proc.stdout)
163 if buf.startswith("AF"):
167 self.assertTrue(buf.startswith("AF ") or buf.startswith("KK ") or buf.startswith("TT "))
169 self.writeLine(server_proc.stdin, buf)
170 buf = self.readLine(server_proc.stdout)
172 if buf.startswith("AF * "):
176 self.fail("Helper protocols not handled")
178 if result is True and client_helper == "ntlmssp-client-1":
179 self.writeLine(client_proc.stdin, "GK")
180 buf = self.readLine(client_proc.stdout)
181 self.assertTrue(buf.startswith("GK "))
183 self.writeLine(client_proc.stdin, "GF")
184 buf = self.readLine(client_proc.stdout)
185 self.assertTrue(buf.startswith("GF "))
187 if result is True and server_helper == "squid-2.5-ntlmssp":
188 self.writeLine(server_proc.stdin, "GK")
189 buf = self.readLine(server_proc.stdout)
190 self.assertTrue(buf.startswith("GK "))
192 self.writeLine(server_proc.stdin, "GF")
193 buf = self.readLine(server_proc.stdout)
194 self.assertTrue(buf.startswith("GF "))
196 client_proc.stdin.close()
198 self.assertEqual(client_proc.returncode, 0)
200 server_proc.stdin.close()
202 self.assertEqual(server_proc.returncode, 0)