pytests: heed assertEquals deprecation warning en-masse
[gd/samba-autobuild/.git] / python / samba / tests / auth_log_winbind.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """
19     auth logging tests that exercise winbind
20 """
21
22 import json
23 import os
24 import time
25
26 from samba.auth import system_session
27 from samba.credentials import Credentials
28 from samba.compat import get_string, get_bytes
29 from samba.dcerpc.messaging import AUTH_EVENT_NAME, MSG_AUTH_LOG
30 from samba.dsdb import UF_NORMAL_ACCOUNT
31 from samba.messaging import Messaging
32 from samba.param import LoadParm
33 from samba.samdb import SamDB
34 from samba.tests import delete_force, BlackboxProcessError, BlackboxTestCase
35 from samba.tests.auth_log_base import AuthLogTestBase
36
37 USER_NAME = "WBALU"
38
39
40 class AuthLogTestsWinbind(AuthLogTestBase, BlackboxTestCase):
41
42     #
43     # Helper function to watch for authentication messages on the
44     # Domain Controller.
45     #
46     def dc_watcher(self):
47
48         (r1, w1) = os.pipe()
49         pid = os.fork()
50         if pid != 0:
51             # Parent process return the result socket to the caller.
52             return r1
53
54         # Load the lp context for the Domain Controller, rather than the
55         # member server.
56         config_file = os.environ["DC_SERVERCONFFILE"]
57         lp_ctx = LoadParm()
58         lp_ctx.load(config_file)
59
60         #
61         # Is the message a SamLogon authentication?
62         def is_sam_logon(m):
63             if m is None:
64                 return False
65             msg = json.loads(m)
66             return (
67                 msg["type"] == "Authentication" and
68                 msg["Authentication"]["serviceDescription"] == "SamLogon")
69
70         #
71         # Handler function for received authentication messages.
72         def message_handler(context, msgType, src, message):
73             # Print the message to help debugging the tests.
74             # as it's a JSON message it does not look like a sub-unit message.
75             print(message)
76             self.dc_msgs.append(message)
77
78         # Set up a messaging context to listen for authentication events on
79         # the domain controller.
80         msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
81         msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
82         msg_handler_and_context = (message_handler, None)
83         msg_ctx.register(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
84
85         # Wait for the SamLogon message.
86         # As there could be other SamLogon's in progress we need to collect
87         # all the SamLogons and let the caller match them to the session.
88         self.dc_msgs = []
89         start_time = time.time()
90         while (time.time() - start_time < 1):
91             msg_ctx.loop_once(0.1)
92
93         # Only interested in SamLogon messages, filter out the rest
94         msgs = list(filter(is_sam_logon, self.dc_msgs))
95         if msgs:
96             for m in msgs:
97                 os.write(w1, get_bytes(m+"\n"))
98         else:
99             os.write(w1, get_bytes("None\n"))
100         os.close(w1)
101
102         msg_ctx.deregister(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
103         msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
104
105         os._exit(0)
106
107     # Remove any DCE/RPC ncacn_np messages
108     # these only get triggered once per session, and stripping them out
109     # avoids ordering dependencies in the tests
110     #
111     def filter_messages(self, messages):
112         def keep(msg):
113             if (msg["type"] == "Authorization" and
114                 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
115                 msg["Authorization"]["authType"] == "ncacn_np"):
116                     return False
117             else:
118                 return True
119
120         return list(filter(keep, messages))
121
122     def setUp(self):
123         super(AuthLogTestsWinbind, self).setUp()
124         self.domain = os.environ["DOMAIN"]
125         self.host = os.environ["SERVER"]
126         self.dc = os.environ["DC_SERVER"]
127         self.lp = self.get_loadparm()
128         self.credentials = self.get_credentials()
129         self.session = system_session()
130
131         self.ldb = SamDB(
132             url="ldap://{0}".format(self.dc),
133             session_info=self.session,
134             credentials=self.credentials,
135             lp=self.lp)
136         self.create_user_account()
137
138     def tearDown(self):
139         super(AuthLogTestsWinbind, self).tearDown()
140         delete_force(self.ldb, self.user_dn)
141
142     #
143     # Create a test user account
144     def create_user_account(self):
145         self.user_pass = self.random_password()
146         self.user_name = USER_NAME
147         self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
148
149         # remove the account if it exists, this will happen if a previous test
150         # run failed
151         delete_force(self.ldb, self.user_dn)
152
153         utf16pw = ('"%s"' % get_string(self.user_pass)).encode('utf-16-le')
154         self.ldb.add({
155            "dn": self.user_dn,
156            "objectclass": "user",
157            "sAMAccountName": "%s" % self.user_name,
158            "userAccountControl": str(UF_NORMAL_ACCOUNT),
159            "unicodePwd": utf16pw})
160
161         self.user_creds = Credentials()
162         self.user_creds.guess(self.get_loadparm())
163         self.user_creds.set_password(self.user_pass)
164         self.user_creds.set_username(self.user_name)
165         self.user_creds.set_workstation(self.server)
166
167     #
168     # Check that the domain server received a SamLogon request for the
169     # current logon.
170     #
171     def check_domain_server_authentication(self, pipe, logon_id, description):
172
173         messages = os.read(pipe, 8192)
174         messages = get_string(messages)
175         if len(messages) == 0 or messages == "None":
176             self.fail("No Domain server authentication message")
177
178         #
179         # Look for the SamLogon request matching logon_id
180         msg = None
181         for message in messages.split("\n"):
182             msg = json.loads(get_string(message))
183             if logon_id == msg["Authentication"]["logonId"]:
184                 break
185             msg = None
186
187         if msg is None:
188             self.fail("No Domain server authentication message")
189
190         #
191         # Validate that message contains the expected data
192         #
193         self.assertEqual("Authentication", msg["type"])
194         self.assertEqual(logon_id, msg["Authentication"]["logonId"])
195         self.assertEqual("SamLogon",
196                           msg["Authentication"]["serviceDescription"])
197         self.assertEqual(description,
198                           msg["Authentication"]["authDescription"])
199
200     def test_ntlm_auth(self):
201
202         def isLastExpectedMessage(msg):
203             DESC = "PAM_AUTH, ntlm_auth"
204             return (
205                 msg["type"] == "Authentication" and
206                 msg["Authentication"]["serviceDescription"] == "winbind" and
207                 msg["Authentication"]["authDescription"] is not None and
208                 msg["Authentication"]["authDescription"].startswith(DESC))
209
210         pipe = self.dc_watcher()
211         COMMAND = "bin/ntlm_auth"
212         self.check_run("{0} --username={1} --password={2}".format(
213             COMMAND,
214             self.credentials.get_username(),
215             self.credentials.get_password()),
216             msg="ntlm_auth failed")
217
218         messages = self.waitForMessages(isLastExpectedMessage)
219         messages = self.filter_messages(messages)
220         expected_messages = 1
221         self.assertEqual(expected_messages,
222                           len(messages),
223                           "Did not receive the expected number of messages")
224
225         # Check the first message it should be an Authentication
226         msg = messages[0]
227         self.assertEqual("Authentication", msg["type"])
228         self.assertTrue(
229             msg["Authentication"]["authDescription"].startswith(
230                 "PAM_AUTH, ntlm_auth,"))
231         self.assertEqual("winbind",
232                           msg["Authentication"]["serviceDescription"])
233         self.assertEqual("Plaintext", msg["Authentication"]["passwordType"])
234         # Logon type should be NetworkCleartext
235         self.assertEqual(8, msg["Authentication"]["logonType"])
236         # Event code should be Successful logon
237         self.assertEqual(4624, msg["Authentication"]["eventId"])
238         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
239         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
240         self.assertEqual(self.domain, msg["Authentication"]["clientDomain"])
241         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
242         self.assertEqual(self.credentials.get_username(),
243                           msg["Authentication"]["clientAccount"])
244         self.assertEqual(self.credentials.get_domain(),
245                           msg["Authentication"]["clientDomain"])
246         self.assertTrue(msg["Authentication"]["workstation"] is None)
247
248         logon_id = msg["Authentication"]["logonId"]
249
250         #
251         # Now check the Domain server authentication message
252         #
253         self.check_domain_server_authentication(pipe, logon_id, "interactive")
254
255     def test_wbinfo(self):
256         def isLastExpectedMessage(msg):
257             DESC = "NTLM_AUTH, wbinfo"
258             return (
259                 msg["type"] == "Authentication" and
260                 msg["Authentication"]["serviceDescription"] == "winbind" and
261                 msg["Authentication"]["authDescription"] is not None and
262                 msg["Authentication"]["authDescription"].startswith(DESC))
263
264         pipe = self.dc_watcher()
265         COMMAND = "bin/wbinfo"
266         try:
267             self.check_run("{0} -a {1}%{2}".format(
268                 COMMAND,
269                 self.credentials.get_username(),
270                 self.credentials.get_password()),
271                 msg="ntlm_auth failed")
272         except BlackboxProcessError:
273             pass
274
275         messages = self.waitForMessages(isLastExpectedMessage)
276         messages = self.filter_messages(messages)
277         expected_messages = 3
278         self.assertEqual(expected_messages,
279                           len(messages),
280                           "Did not receive the expected number of messages")
281
282         # The 1st message should be an Authentication against the local
283         # password database
284         msg = messages[0]
285         self.assertEqual("Authentication", msg["type"])
286         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
287             "PASSDB, wbinfo,"))
288         self.assertEqual("winbind",
289                           msg["Authentication"]["serviceDescription"])
290         # Logon type should be Interactive
291         self.assertEqual(2, msg["Authentication"]["logonType"])
292         # Event code should be Unsuccessful logon
293         self.assertEqual(4625, msg["Authentication"]["eventId"])
294         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
295         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
296         self.assertEqual('', msg["Authentication"]["clientDomain"])
297         # This is what the existing winbind implementation returns.
298         self.assertEqual("NT_STATUS_NO_SUCH_USER",
299                           msg["Authentication"]["status"])
300         self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
301         self.assertEqual(self.credentials.get_username(),
302                           msg["Authentication"]["clientAccount"])
303         self.assertEqual("", msg["Authentication"]["clientDomain"])
304
305         logon_id = msg["Authentication"]["logonId"]
306
307         # The 2nd message should be a PAM_AUTH with the same logon id as the
308         # 1st message
309         msg = messages[1]
310         self.assertEqual("Authentication", msg["type"])
311         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
312             "PAM_AUTH"))
313         self.assertEqual("winbind",
314                           msg["Authentication"]["serviceDescription"])
315         self.assertEqual(logon_id, msg["Authentication"]["logonId"])
316         # Logon type should be NetworkCleartext
317         self.assertEqual(8, msg["Authentication"]["logonType"])
318         # Event code should be Unsuccessful logon
319         self.assertEqual(4625, msg["Authentication"]["eventId"])
320         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
321         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
322         self.assertEqual('', msg["Authentication"]["clientDomain"])
323         # This is what the existing winbind implementation returns.
324         self.assertEqual("NT_STATUS_NO_SUCH_USER",
325                           msg["Authentication"]["status"])
326         self.assertEqual(self.credentials.get_username(),
327                           msg["Authentication"]["clientAccount"])
328         self.assertEqual("", msg["Authentication"]["clientDomain"])
329
330         # The 3rd message should be an NTLM_AUTH
331         msg = messages[2]
332         self.assertEqual("Authentication", msg["type"])
333         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
334             "NTLM_AUTH, wbinfo,"))
335         self.assertEqual("winbind",
336                           msg["Authentication"]["serviceDescription"])
337         # Logon type should be Network
338         self.assertEqual(3, msg["Authentication"]["logonType"])
339         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
340         # Event code should be successful logon
341         self.assertEqual(4624, msg["Authentication"]["eventId"])
342         self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
343         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
344         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
345         self.assertEqual(self.credentials.get_username(),
346                           msg["Authentication"]["clientAccount"])
347         self.assertEqual(self.credentials.get_domain(),
348                           msg["Authentication"]["clientDomain"])
349
350         logon_id = msg["Authentication"]["logonId"]
351
352         #
353         # Now check the Domain server authentication message
354         #
355         self.check_domain_server_authentication(pipe, logon_id, "network")
356
357     def test_wbinfo_ntlmv1(self):
358         def isLastExpectedMessage(msg):
359             DESC = "NTLM_AUTH, wbinfo"
360             return (
361                 msg["type"] == "Authentication" and
362                 msg["Authentication"]["serviceDescription"] == "winbind" and
363                 msg["Authentication"]["authDescription"] is not None and
364                 msg["Authentication"]["authDescription"].startswith(DESC))
365
366         pipe = self.dc_watcher()
367         COMMAND = "bin/wbinfo"
368         try:
369             self.check_run("{0} --ntlmv1 -a {1}%{2}".format(
370                 COMMAND,
371                 self.credentials.get_username(),
372                 self.credentials.get_password()),
373                 msg="ntlm_auth failed")
374         except BlackboxProcessError:
375             pass
376
377         messages = self.waitForMessages(isLastExpectedMessage)
378         messages = self.filter_messages(messages)
379         expected_messages = 3
380         self.assertEqual(expected_messages,
381                           len(messages),
382                           "Did not receive the expected number of messages")
383
384         # The 1st message should be an Authentication against the local
385         # password database
386         msg = messages[0]
387         self.assertEqual("Authentication", msg["type"])
388         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
389             "PASSDB, wbinfo,"))
390         self.assertEqual("winbind",
391                           msg["Authentication"]["serviceDescription"])
392         # Logon type should be Interactive
393         self.assertEqual(2, msg["Authentication"]["logonType"])
394         # Event code should be Unsuccessful logon
395         self.assertEqual(4625, msg["Authentication"]["eventId"])
396         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
397         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
398         self.assertEqual('', msg["Authentication"]["clientDomain"])
399         # This is what the existing winbind implementation returns.
400         self.assertEqual("NT_STATUS_NO_SUCH_USER",
401                           msg["Authentication"]["status"])
402         self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
403         self.assertEqual(self.credentials.get_username(),
404                           msg["Authentication"]["clientAccount"])
405         self.assertEqual("", msg["Authentication"]["clientDomain"])
406
407         logon_id = msg["Authentication"]["logonId"]
408
409         # The 2nd message should be a PAM_AUTH with the same logon id as the
410         # 1st message
411         msg = messages[1]
412         self.assertEqual("Authentication", msg["type"])
413         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
414             "PAM_AUTH"))
415         self.assertEqual("winbind",
416                           msg["Authentication"]["serviceDescription"])
417         self.assertEqual(logon_id, msg["Authentication"]["logonId"])
418         self.assertEqual("Plaintext", msg["Authentication"]["passwordType"])
419         # Logon type should be NetworkCleartext
420         self.assertEqual(8, msg["Authentication"]["logonType"])
421         # Event code should be Unsuccessful logon
422         self.assertEqual(4625, msg["Authentication"]["eventId"])
423         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
424         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
425         self.assertEqual('', msg["Authentication"]["clientDomain"])
426         # This is what the existing winbind implementation returns.
427         self.assertEqual("NT_STATUS_NO_SUCH_USER",
428                           msg["Authentication"]["status"])
429         self.assertEqual(self.credentials.get_username(),
430                           msg["Authentication"]["clientAccount"])
431         self.assertEqual("", msg["Authentication"]["clientDomain"])
432
433         # The 3rd message should be an NTLM_AUTH
434         msg = messages[2]
435         self.assertEqual("Authentication", msg["type"])
436         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
437             "NTLM_AUTH, wbinfo,"))
438         self.assertEqual("winbind",
439                           msg["Authentication"]["serviceDescription"])
440         self.assertEqual("NTLMv1",
441                           msg["Authentication"]["passwordType"])
442         # Logon type should be Network
443         self.assertEqual(3, msg["Authentication"]["logonType"])
444         self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
445         # Event code should be successful logon
446         self.assertEqual(4624, msg["Authentication"]["eventId"])
447         self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
448         self.assertEqual("unix:", msg["Authentication"]["localAddress"])
449         self.assertEqual(self.credentials.get_username(),
450                           msg["Authentication"]["clientAccount"])
451         self.assertEqual(self.credentials.get_domain(),
452                           msg["Authentication"]["clientDomain"])
453
454         logon_id = msg["Authentication"]["logonId"]
455         #
456         # Now check the Domain server authentication message
457         #
458         self.check_domain_server_authentication(pipe, logon_id, "network")