1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 auth logging tests that exercise winbind
26 from samba.auth import system_session
27 from samba.credentials import Credentials
28 from samba.compat import get_string, get_bytes
29 from samba.dcerpc.messaging import AUTH_EVENT_NAME, MSG_AUTH_LOG
30 from samba.dsdb import UF_NORMAL_ACCOUNT
31 from samba.messaging import Messaging
32 from samba.param import LoadParm
33 from samba.samdb import SamDB
34 from samba.tests import delete_force, BlackboxProcessError, BlackboxTestCase
35 from samba.tests.auth_log_base import AuthLogTestBase
40 class AuthLogTestsWinbind(AuthLogTestBase, BlackboxTestCase):
43 # Helper function to watch for authentication messages on the
51 # Parent process return the result socket to the caller.
54 # Load the lp context for the Domain Controller, rather than the
56 config_file = os.environ["DC_SERVERCONFFILE"]
58 lp_ctx.load(config_file)
61 # Is the message a SamLogon authentication?
67 msg["type"] == "Authentication" and
68 msg["Authentication"]["serviceDescription"] == "SamLogon")
71 # Handler function for received authentication messages.
72 def message_handler(context, msgType, src, message):
73 # Print the message to help debugging the tests.
74 # as it's a JSON message it does not look like a sub-unit message.
76 self.dc_msgs.append(message)
78 # Set up a messaging context to listen for authentication events on
79 # the domain controller.
80 msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
81 msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
82 msg_handler_and_context = (message_handler, None)
83 msg_ctx.register(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
85 # Wait for the SamLogon message.
86 # As there could be other SamLogon's in progress we need to collect
87 # all the SamLogons and let the caller match them to the session.
89 start_time = time.time()
90 while (time.time() - start_time < 1):
91 msg_ctx.loop_once(0.1)
93 # Only interested in SamLogon messages, filter out the rest
94 msgs = list(filter(is_sam_logon, self.dc_msgs))
97 os.write(w1, get_bytes(m+"\n"))
99 os.write(w1, get_bytes("None\n"))
102 msg_ctx.deregister(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
103 msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
107 # Remove any DCE/RPC ncacn_np messages
108 # these only get triggered once per session, and stripping them out
109 # avoids ordering dependencies in the tests
111 def filter_messages(self, messages):
113 if (msg["type"] == "Authorization" and
114 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
115 msg["Authorization"]["authType"] == "ncacn_np"):
120 return list(filter(keep, messages))
123 super(AuthLogTestsWinbind, self).setUp()
124 self.domain = os.environ["DOMAIN"]
125 self.host = os.environ["SERVER"]
126 self.dc = os.environ["DC_SERVER"]
127 self.lp = self.get_loadparm()
128 self.credentials = self.get_credentials()
129 self.session = system_session()
132 url="ldap://{0}".format(self.dc),
133 session_info=self.session,
134 credentials=self.credentials,
136 self.create_user_account()
139 super(AuthLogTestsWinbind, self).tearDown()
140 delete_force(self.ldb, self.user_dn)
143 # Create a test user account
144 def create_user_account(self):
145 self.user_pass = self.random_password()
146 self.user_name = USER_NAME
147 self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
149 # remove the account if it exists, this will happen if a previous test
151 delete_force(self.ldb, self.user_dn)
153 utf16pw = ('"%s"' % get_string(self.user_pass)).encode('utf-16-le')
156 "objectclass": "user",
157 "sAMAccountName": "%s" % self.user_name,
158 "userAccountControl": str(UF_NORMAL_ACCOUNT),
159 "unicodePwd": utf16pw})
161 self.user_creds = Credentials()
162 self.user_creds.guess(self.get_loadparm())
163 self.user_creds.set_password(self.user_pass)
164 self.user_creds.set_username(self.user_name)
165 self.user_creds.set_workstation(self.server)
168 # Check that the domain server received a SamLogon request for the
171 def check_domain_server_authentication(self, pipe, logon_id, description):
173 messages = os.read(pipe, 8192)
174 messages = get_string(messages)
175 if len(messages) == 0 or messages == "None":
176 self.fail("No Domain server authentication message")
179 # Look for the SamLogon request matching logon_id
181 for message in messages.split("\n"):
182 msg = json.loads(get_string(message))
183 if logon_id == msg["Authentication"]["logonId"]:
188 self.fail("No Domain server authentication message")
191 # Validate that message contains the expected data
193 self.assertEqual("Authentication", msg["type"])
194 self.assertEqual(logon_id, msg["Authentication"]["logonId"])
195 self.assertEqual("SamLogon",
196 msg["Authentication"]["serviceDescription"])
197 self.assertEqual(description,
198 msg["Authentication"]["authDescription"])
200 def test_ntlm_auth(self):
202 def isLastExpectedMessage(msg):
203 DESC = "PAM_AUTH, ntlm_auth"
205 msg["type"] == "Authentication" and
206 msg["Authentication"]["serviceDescription"] == "winbind" and
207 msg["Authentication"]["authDescription"] is not None and
208 msg["Authentication"]["authDescription"].startswith(DESC))
210 pipe = self.dc_watcher()
211 COMMAND = "bin/ntlm_auth"
212 self.check_run("{0} --username={1} --password={2}".format(
214 self.credentials.get_username(),
215 self.credentials.get_password()),
216 msg="ntlm_auth failed")
218 messages = self.waitForMessages(isLastExpectedMessage)
219 messages = self.filter_messages(messages)
220 expected_messages = 1
221 self.assertEqual(expected_messages,
223 "Did not receive the expected number of messages")
225 # Check the first message it should be an Authentication
227 self.assertEqual("Authentication", msg["type"])
229 msg["Authentication"]["authDescription"].startswith(
230 "PAM_AUTH, ntlm_auth,"))
231 self.assertEqual("winbind",
232 msg["Authentication"]["serviceDescription"])
233 self.assertEqual("Plaintext", msg["Authentication"]["passwordType"])
234 # Logon type should be NetworkCleartext
235 self.assertEqual(8, msg["Authentication"]["logonType"])
236 # Event code should be Successful logon
237 self.assertEqual(4624, msg["Authentication"]["eventId"])
238 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
239 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
240 self.assertEqual(self.domain, msg["Authentication"]["clientDomain"])
241 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
242 self.assertEqual(self.credentials.get_username(),
243 msg["Authentication"]["clientAccount"])
244 self.assertEqual(self.credentials.get_domain(),
245 msg["Authentication"]["clientDomain"])
246 self.assertTrue(msg["Authentication"]["workstation"] is None)
248 logon_id = msg["Authentication"]["logonId"]
251 # Now check the Domain server authentication message
253 self.check_domain_server_authentication(pipe, logon_id, "interactive")
255 def test_wbinfo(self):
256 def isLastExpectedMessage(msg):
257 DESC = "NTLM_AUTH, wbinfo"
259 msg["type"] == "Authentication" and
260 msg["Authentication"]["serviceDescription"] == "winbind" and
261 msg["Authentication"]["authDescription"] is not None and
262 msg["Authentication"]["authDescription"].startswith(DESC))
264 pipe = self.dc_watcher()
265 COMMAND = "bin/wbinfo"
267 self.check_run("{0} -a {1}%{2}".format(
269 self.credentials.get_username(),
270 self.credentials.get_password()),
271 msg="ntlm_auth failed")
272 except BlackboxProcessError:
275 messages = self.waitForMessages(isLastExpectedMessage)
276 messages = self.filter_messages(messages)
277 expected_messages = 3
278 self.assertEqual(expected_messages,
280 "Did not receive the expected number of messages")
282 # The 1st message should be an Authentication against the local
285 self.assertEqual("Authentication", msg["type"])
286 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
288 self.assertEqual("winbind",
289 msg["Authentication"]["serviceDescription"])
290 # Logon type should be Interactive
291 self.assertEqual(2, msg["Authentication"]["logonType"])
292 # Event code should be Unsuccessful logon
293 self.assertEqual(4625, msg["Authentication"]["eventId"])
294 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
295 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
296 self.assertEqual('', msg["Authentication"]["clientDomain"])
297 # This is what the existing winbind implementation returns.
298 self.assertEqual("NT_STATUS_NO_SUCH_USER",
299 msg["Authentication"]["status"])
300 self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
301 self.assertEqual(self.credentials.get_username(),
302 msg["Authentication"]["clientAccount"])
303 self.assertEqual("", msg["Authentication"]["clientDomain"])
305 logon_id = msg["Authentication"]["logonId"]
307 # The 2nd message should be a PAM_AUTH with the same logon id as the
310 self.assertEqual("Authentication", msg["type"])
311 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
313 self.assertEqual("winbind",
314 msg["Authentication"]["serviceDescription"])
315 self.assertEqual(logon_id, msg["Authentication"]["logonId"])
316 # Logon type should be NetworkCleartext
317 self.assertEqual(8, msg["Authentication"]["logonType"])
318 # Event code should be Unsuccessful logon
319 self.assertEqual(4625, msg["Authentication"]["eventId"])
320 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
321 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
322 self.assertEqual('', msg["Authentication"]["clientDomain"])
323 # This is what the existing winbind implementation returns.
324 self.assertEqual("NT_STATUS_NO_SUCH_USER",
325 msg["Authentication"]["status"])
326 self.assertEqual(self.credentials.get_username(),
327 msg["Authentication"]["clientAccount"])
328 self.assertEqual("", msg["Authentication"]["clientDomain"])
330 # The 3rd message should be an NTLM_AUTH
332 self.assertEqual("Authentication", msg["type"])
333 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
334 "NTLM_AUTH, wbinfo,"))
335 self.assertEqual("winbind",
336 msg["Authentication"]["serviceDescription"])
337 # Logon type should be Network
338 self.assertEqual(3, msg["Authentication"]["logonType"])
339 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
340 # Event code should be successful logon
341 self.assertEqual(4624, msg["Authentication"]["eventId"])
342 self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
343 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
344 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
345 self.assertEqual(self.credentials.get_username(),
346 msg["Authentication"]["clientAccount"])
347 self.assertEqual(self.credentials.get_domain(),
348 msg["Authentication"]["clientDomain"])
350 logon_id = msg["Authentication"]["logonId"]
353 # Now check the Domain server authentication message
355 self.check_domain_server_authentication(pipe, logon_id, "network")
357 def test_wbinfo_ntlmv1(self):
358 def isLastExpectedMessage(msg):
359 DESC = "NTLM_AUTH, wbinfo"
361 msg["type"] == "Authentication" and
362 msg["Authentication"]["serviceDescription"] == "winbind" and
363 msg["Authentication"]["authDescription"] is not None and
364 msg["Authentication"]["authDescription"].startswith(DESC))
366 pipe = self.dc_watcher()
367 COMMAND = "bin/wbinfo"
369 self.check_run("{0} --ntlmv1 -a {1}%{2}".format(
371 self.credentials.get_username(),
372 self.credentials.get_password()),
373 msg="ntlm_auth failed")
374 except BlackboxProcessError:
377 messages = self.waitForMessages(isLastExpectedMessage)
378 messages = self.filter_messages(messages)
379 expected_messages = 3
380 self.assertEqual(expected_messages,
382 "Did not receive the expected number of messages")
384 # The 1st message should be an Authentication against the local
387 self.assertEqual("Authentication", msg["type"])
388 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
390 self.assertEqual("winbind",
391 msg["Authentication"]["serviceDescription"])
392 # Logon type should be Interactive
393 self.assertEqual(2, msg["Authentication"]["logonType"])
394 # Event code should be Unsuccessful logon
395 self.assertEqual(4625, msg["Authentication"]["eventId"])
396 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
397 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
398 self.assertEqual('', msg["Authentication"]["clientDomain"])
399 # This is what the existing winbind implementation returns.
400 self.assertEqual("NT_STATUS_NO_SUCH_USER",
401 msg["Authentication"]["status"])
402 self.assertEqual("NTLMv2", msg["Authentication"]["passwordType"])
403 self.assertEqual(self.credentials.get_username(),
404 msg["Authentication"]["clientAccount"])
405 self.assertEqual("", msg["Authentication"]["clientDomain"])
407 logon_id = msg["Authentication"]["logonId"]
409 # The 2nd message should be a PAM_AUTH with the same logon id as the
412 self.assertEqual("Authentication", msg["type"])
413 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
415 self.assertEqual("winbind",
416 msg["Authentication"]["serviceDescription"])
417 self.assertEqual(logon_id, msg["Authentication"]["logonId"])
418 self.assertEqual("Plaintext", msg["Authentication"]["passwordType"])
419 # Logon type should be NetworkCleartext
420 self.assertEqual(8, msg["Authentication"]["logonType"])
421 # Event code should be Unsuccessful logon
422 self.assertEqual(4625, msg["Authentication"]["eventId"])
423 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
424 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
425 self.assertEqual('', msg["Authentication"]["clientDomain"])
426 # This is what the existing winbind implementation returns.
427 self.assertEqual("NT_STATUS_NO_SUCH_USER",
428 msg["Authentication"]["status"])
429 self.assertEqual(self.credentials.get_username(),
430 msg["Authentication"]["clientAccount"])
431 self.assertEqual("", msg["Authentication"]["clientDomain"])
433 # The 3rd message should be an NTLM_AUTH
435 self.assertEqual("Authentication", msg["type"])
436 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
437 "NTLM_AUTH, wbinfo,"))
438 self.assertEqual("winbind",
439 msg["Authentication"]["serviceDescription"])
440 self.assertEqual("NTLMv1",
441 msg["Authentication"]["passwordType"])
442 # Logon type should be Network
443 self.assertEqual(3, msg["Authentication"]["logonType"])
444 self.assertEqual("NT_STATUS_OK", msg["Authentication"]["status"])
445 # Event code should be successful logon
446 self.assertEqual(4624, msg["Authentication"]["eventId"])
447 self.assertEqual("unix:", msg["Authentication"]["remoteAddress"])
448 self.assertEqual("unix:", msg["Authentication"]["localAddress"])
449 self.assertEqual(self.credentials.get_username(),
450 msg["Authentication"]["clientAccount"])
451 self.assertEqual(self.credentials.get_domain(),
452 msg["Authentication"]["clientDomain"])
454 logon_id = msg["Authentication"]["logonId"]
456 # Now check the Domain server authentication message
458 self.check_domain_server_authentication(pipe, logon_id, "network")