python: tests: update all super calls to python 3 style in tests
[samba.git] / python / samba / tests / ntlm_auth_base.py
1 # Unix SMB/CIFS implementation.
2 # A test for the ntlm_auth tool
3 # Copyright (C) Kai Blin <kai@samba.org> 2008
4 # Copyright (C) Samuel Cabrero <scabrero@suse.de> 2018
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19 """Test ntlm_auth
20 This test program will start ntlm_auth with the given command line switches and
21 see if it will get the expected results.
22 """
23
24 import os
25 import samba
26 import subprocess
27 from samba.tests import BlackboxTestCase
28
29 class NTLMAuthTestCase(BlackboxTestCase):
30
31     def setUp(self):
32         super().setUp()
33         bindir = os.path.normpath(os.getenv("BINDIR", "./bin"))
34         self.ntlm_auth_path = os.path.join(bindir, 'ntlm_auth')
35         self.lp = samba.tests.env_loadparm()
36         self.winbind_separator = self.lp.get('winbind separator')
37
38     def readLine(self, text_stream):
39         buf = text_stream.readline()
40         newline = buf.find('\n')
41         if newline == -1:
42             raise Exception("Failed to read line")
43         return buf[:newline]
44
45     def writeLine(self, text_stream, buf):
46         text_stream.write(buf)
47         text_stream.write("\n")
48
49     def run_helper(self,
50                    client_username=None,
51                    client_password=None,
52                    client_domain=None,
53                    client_use_cached_creds=False,
54                    server_username=None,
55                    server_password=None,
56                    server_domain=None,
57                    client_helper="ntlmssp-client-1",
58                    server_helper="squid-2.5-ntlmssp",
59                    server_use_winbind=False,
60                    require_membership=None,
61                    target_hostname=None,
62                    target_service=None):
63         self.assertTrue(os.access(self.ntlm_auth_path, os.X_OK))
64
65         if client_username is None:
66             raise Exception("client_username required")
67
68         # Client helper args
69         client_args = []
70         client_args.append(self.ntlm_auth_path)
71         client_args.append("--helper-protocol=%s" % client_helper)
72         client_args.append("--username=%s" % client_username)
73         if client_domain:
74             client_args.append("--domain=%s" % client_domain)
75         if client_use_cached_creds:
76             client_args.append("--use-cached-creds")
77         else:
78             if client_password is None:
79                 raise Exception("client_password required")
80             client_args.append("--password=%s" % client_password)
81         if target_service:
82             client_args.append("--target-service=%s" % target_service)
83         if target_hostname:
84             client_args.append("--target-hostname=%s" % target_hostname)
85         client_args.append("--configfile=%s" % self.lp.configfile)
86
87         # Server helper args
88         server_args = []
89         server_args.append(self.ntlm_auth_path)
90         server_args.append("--helper-protocol=%s" % server_helper)
91         server_args.append("--configfile=%s" % self.lp.configfile)
92         if not server_use_winbind:
93             if server_username is None or server_password is None or server_domain is None:
94                 raise Exception("Server credentials required if not using winbind")
95             server_args.append("--username=%s" % server_username)
96             server_args.append("--password=%s" % server_password)
97             server_args.append("--domain=%s" % server_domain)
98             if require_membership is not None:
99                 raise Exception("Server must be using winbind for require-membership-of")
100         else:
101             if require_membership is not None:
102                 server_args.append("--require-membership-of=%s" % require_membership)
103
104         # Run helpers
105         result = False
106         server_proc = subprocess.Popen(server_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0, universal_newlines=True)
107         client_proc = subprocess.Popen(client_args, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0, universal_newlines=True)
108
109         try:
110             if client_helper == "ntlmssp-client-1" and server_helper == "squid-2.5-ntlmssp":
111                 self.writeLine(client_proc.stdin, "YR")
112                 buf = self.readLine(client_proc.stdout)
113                 self.assertTrue(buf.startswith("YR "))
114
115                 self.writeLine(server_proc.stdin, buf)
116                 buf = self.readLine(server_proc.stdout)
117                 self.assertTrue(buf.startswith("TT "))
118
119                 self.writeLine(client_proc.stdin, buf)
120                 buf = self.readLine(client_proc.stdout)
121                 self.assertTrue(buf.startswith("AF "))
122
123                 # Client sends 'AF <base64 blob>' but server
124                 # expects 'KK <base64 blob>'
125                 buf = buf.replace("AF", "KK", 1)
126
127                 self.writeLine(server_proc.stdin, buf)
128                 buf = self.readLine(server_proc.stdout)
129                 result = buf.startswith("AF ")
130             elif client_helper == "ntlmssp-client-1" and server_helper == "gss-spnego":
131                 self.writeLine(client_proc.stdin, "YR")
132                 buf = self.readLine(client_proc.stdout)
133                 self.assertTrue(buf.startswith("YR "))
134
135                 self.writeLine(server_proc.stdin, buf)
136                 buf = self.readLine(server_proc.stdout)
137                 self.assertTrue(buf.startswith("TT "))
138
139                 self.writeLine(client_proc.stdin, buf)
140                 buf = self.readLine(client_proc.stdout)
141                 self.assertTrue(buf.startswith("AF "))
142
143                 # Client sends 'AF <base64 blob>' but server expects 'KK <abse64 blob>'
144                 buf = buf.replace("AF", "KK", 1)
145
146                 self.writeLine(server_proc.stdin, buf)
147                 buf = self.readLine(server_proc.stdout)
148                 result = buf.startswith("AF * ")
149             elif client_helper == "gss-spnego-client" and server_helper == "gss-spnego":
150                 self.writeLine(server_proc.stdin, "YR")
151                 buf = self.readLine(server_proc.stdout)
152
153                 while True:
154                     if (buf.startswith("NA * ")):
155                         result = False
156                         break
157
158                     self.assertTrue(buf.startswith("AF ") or buf.startswith("TT "))
159
160                     self.writeLine(client_proc.stdin, buf)
161                     buf = self.readLine(client_proc.stdout)
162
163                     if buf.startswith("AF"):
164                         result = True
165                         break
166
167                     self.assertTrue(buf.startswith("AF ") or buf.startswith("KK ") or buf.startswith("TT "))
168
169                     self.writeLine(server_proc.stdin, buf)
170                     buf = self.readLine(server_proc.stdout)
171
172                     if buf.startswith("AF * "):
173                         result = True
174                         break
175             else:
176                 self.fail("Helper protocols not handled")
177
178             if result is True and client_helper == "ntlmssp-client-1":
179                 self.writeLine(client_proc.stdin, "GK")
180                 buf = self.readLine(client_proc.stdout)
181                 self.assertTrue(buf.startswith("GK "))
182
183                 self.writeLine(client_proc.stdin, "GF")
184                 buf = self.readLine(client_proc.stdout)
185                 self.assertTrue(buf.startswith("GF "))
186
187             if result is True and server_helper == "squid-2.5-ntlmssp":
188                 self.writeLine(server_proc.stdin, "GK")
189                 buf = self.readLine(server_proc.stdout)
190                 self.assertTrue(buf.startswith("GK "))
191
192                 self.writeLine(server_proc.stdin, "GF")
193                 buf = self.readLine(server_proc.stdout)
194                 self.assertTrue(buf.startswith("GF "))
195
196             client_proc.stdin.close()
197             client_proc.wait()
198             self.assertEqual(client_proc.returncode, 0)
199
200             server_proc.stdin.close()
201             server_proc.wait()
202             self.assertEqual(server_proc.returncode, 0)
203
204             return result
205         except:
206             client_proc.kill()
207             client_proc.wait()
208             server_proc.kill()
209             server_proc.wait()
210             raise