1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 auth logging tests that exercise winbind
24 from random import SystemRandom
28 from samba.auth import system_session
29 from samba.credentials import Credentials
30 from samba.compat import get_string, get_bytes
31 from samba.dcerpc.messaging import AUTH_EVENT_NAME, MSG_AUTH_LOG
32 from samba.dsdb import UF_NORMAL_ACCOUNT
33 from samba.messaging import Messaging
34 from samba.param import LoadParm
35 from samba.samdb import SamDB
36 from samba.tests import delete_force, BlackboxProcessError, BlackboxTestCase
37 from samba.tests.auth_log_base import AuthLogTestBase
42 class AuthLogTestsWinbind(AuthLogTestBase, BlackboxTestCase):
45 # Helper function to watch for authentication messages on the
53 # Parent process return the result socket to the caller.
56 # Load the lp context for the Domain Controller, rather than the
58 config_file = os.environ["DC_SERVERCONFFILE"]
60 lp_ctx.load(config_file)
63 # Is the message a SamLogon authentication?
69 msg["type"] == "Authentication" and
70 msg["Authentication"]["serviceDescription"] == "SamLogon" and
71 msg["Authentication"]["authDescription"] == "interactive")
74 # Handler function for received authentication messages.
75 def message_handler(context, msgType, src, message):
76 # Print the message to help debugging the tests.
77 # as it's a JSON message it does not look like a sub-unit message.
81 # Set up a messaging context to listen for authentication events on
82 # the domain controller.
83 msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
84 msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
85 msg_handler_and_context = (message_handler, None)
86 msg_ctx.register(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
88 # Wait for the SamLogon message.
89 # As the SamLogon is the only message we're interested ignore any other
92 start_time = time.time()
93 while not is_sam_logon(self.dc_msg) and (time.time() - start_time < 1):
94 msg_ctx.loop_once(0.1)
96 if self.dc_msg is None:
97 os.write(w1, get_bytes("None"))
99 os.write(w1, get_bytes(self.dc_msg))
101 msg_ctx.deregister(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
102 msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
106 # Remove any DCE/RPC ncacn_np messages
107 # these only get triggered once per session, and stripping them out
108 # avoids ordering dependencies in the tests
110 def filter_messages(self, messages):
112 if (msg["type"] == "Authorization" and
113 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
114 msg["Authorization"]["authType"] == "ncacn_np"):
119 return list(filter(keep, messages))
122 super(AuthLogTestsWinbind, self).setUp()
123 self.domain = os.environ["DOMAIN"]
124 self.host = os.environ["SERVER"]
125 self.dc = os.environ["DC_SERVER"]
126 self.lp = self.get_loadparm()
127 self.credentials = self.get_credentials()
128 self.session = system_session()
131 url="ldap://{0}".format(self.dc),
132 session_info=self.session,
133 credentials=self.credentials,
135 self.create_user_account()
138 super(AuthLogTestsWinbind, self).tearDown()
139 delete_force(self.ldb, self.user_dn)
142 # Create a test user account
143 def create_user_account(self):
144 self.user_pass = self.random_password()
145 self.user_name = USER_NAME
146 self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
148 # remove the account if it exists, this will happen if a previous test
150 delete_force(self.ldb, self.user_dn)
152 utf16pw = ('"%s"' % get_string(self.user_pass)).encode('utf-16-le')
155 "objectclass": "user",
156 "sAMAccountName": "%s" % self.user_name,
157 "userAccountControl": str(UF_NORMAL_ACCOUNT),
158 "unicodePwd": utf16pw})
160 self.user_creds = Credentials()
161 self.user_creds.guess(self.get_loadparm())
162 self.user_creds.set_password(self.user_pass)
163 self.user_creds.set_username(self.user_name)
164 self.user_creds.set_workstation(self.server)
166 def test_ntlm_auth(self):
168 def isLastExpectedMessage(msg):
169 DESC = "PAM_AUTH, ntlm_auth"
171 msg["type"] == "Authentication" and
172 msg["Authentication"]["serviceDescription"] == "winbind" and
173 msg["Authentication"]["authDescription"] is not None and
174 msg["Authentication"]["authDescription"].startswith(DESC))
176 pipe = self.dc_watcher()
177 COMMAND = "bin/ntlm_auth"
178 self.check_run("{0} --username={1} --password={2}".format(
180 self.credentials.get_username(),
181 self.credentials.get_password()),
182 msg="ntlm_auth failed")
184 messages = self.waitForMessages(isLastExpectedMessage)
185 messages = self.filter_messages(messages)
186 expected_messages = 1
187 self.assertEquals(expected_messages,
189 "Did not receive the expected number of messages")
191 # Check the first message it should be an Authentication
193 self.assertEquals("Authentication", msg["type"])
195 msg["Authentication"]["authDescription"].startswith(
196 "PAM_AUTH, ntlm_auth,"))
197 self.assertEquals("winbind",
198 msg["Authentication"]["serviceDescription"])
199 self.assertEquals("Plaintext", msg["Authentication"]["passwordType"])
200 # Logon type should be NetworkCleartext
201 self.assertEquals(8, msg["Authentication"]["logonType"])
202 # Event code should be Successful logon
203 self.assertEquals(4624, msg["Authentication"]["eventId"])
204 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
205 self.assertEquals("unix:", msg["Authentication"]["localAddress"])
206 self.assertEquals(self.domain, msg["Authentication"]["clientDomain"])
207 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
208 self.assertEquals(self.credentials.get_username(),
209 msg["Authentication"]["clientAccount"])
210 self.assertEquals(self.credentials.get_domain(),
211 msg["Authentication"]["clientDomain"])
212 self.assertTrue(msg["Authentication"]["workstation"] is None)
214 logon_id = msg["Authentication"]["logonId"]
216 message = os.read(pipe, 4096)
217 msg = json.loads(get_string(message))
218 self.assertEquals("Authentication", msg["type"])
219 self.assertEquals(logon_id, msg["Authentication"]["logonId"])
220 self.assertEquals("SamLogon",
221 msg["Authentication"]["serviceDescription"])
222 self.assertEquals("interactive",
223 msg["Authentication"]["authDescription"])
225 def test_wbinfo(self):
226 def isLastExpectedMessage(msg):
227 DESC = "NTLM_AUTH, wbinfo"
229 msg["type"] == "Authentication" and
230 msg["Authentication"]["serviceDescription"] == "winbind" and
231 msg["Authentication"]["authDescription"] is not None and
232 msg["Authentication"]["authDescription"].startswith(DESC))
234 pipe = self.dc_watcher()
235 COMMAND = "bin/wbinfo"
237 self.check_run("{0} -a {1}%{2}".format(
239 self.credentials.get_username(),
240 self.credentials.get_password()),
241 msg="ntlm_auth failed")
242 except BlackboxProcessError:
245 messages = self.waitForMessages(isLastExpectedMessage)
246 messages = self.filter_messages(messages)
247 expected_messages = 3
248 self.assertEquals(expected_messages,
250 "Did not receive the expected number of messages")
252 # The 1st message should be an Authentication against the local
255 self.assertEquals("Authentication", msg["type"])
256 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
258 self.assertEquals("winbind",
259 msg["Authentication"]["serviceDescription"])
260 # Logon type should be Interactive
261 self.assertEquals(2, msg["Authentication"]["logonType"])
262 # Event code should be Unsuccessful logon
263 self.assertEquals(4625, msg["Authentication"]["eventId"])
264 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
265 self.assertEquals("unix:", msg["Authentication"]["localAddress"])
266 self.assertEquals('', msg["Authentication"]["clientDomain"])
267 # This is what the existing winbind implementation returns.
268 self.assertEquals("NT_STATUS_NO_SUCH_USER",
269 msg["Authentication"]["status"])
270 self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"])
271 self.assertEquals(self.credentials.get_username(),
272 msg["Authentication"]["clientAccount"])
273 self.assertEquals("", msg["Authentication"]["clientDomain"])
275 logon_id = msg["Authentication"]["logonId"]
277 # The 2nd message should be a PAM_AUTH with the same logon id as the
280 self.assertEquals("Authentication", msg["type"])
281 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
283 self.assertEquals("winbind",
284 msg["Authentication"]["serviceDescription"])
285 self.assertEquals(logon_id, msg["Authentication"]["logonId"])
286 # Logon type should be NetworkCleartext
287 self.assertEquals(8, msg["Authentication"]["logonType"])
288 # Event code should be Unsuccessful logon
289 self.assertEquals(4625, msg["Authentication"]["eventId"])
290 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
291 self.assertEquals("unix:", msg["Authentication"]["localAddress"])
292 self.assertEquals('', msg["Authentication"]["clientDomain"])
293 # This is what the existing winbind implementation returns.
294 self.assertEquals("NT_STATUS_INVALID_HANDLE",
295 msg["Authentication"]["status"])
296 self.assertEquals(self.credentials.get_username(),
297 msg["Authentication"]["clientAccount"])
298 self.assertEquals("", msg["Authentication"]["clientDomain"])
300 # The 3rd message should be an NTLM_AUTH
302 self.assertEquals("Authentication", msg["type"])
303 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
304 "NTLM_AUTH, wbinfo,"))
305 self.assertEquals("winbind",
306 msg["Authentication"]["serviceDescription"])
307 # Logon type should be Network
308 self.assertEquals(3, msg["Authentication"]["logonType"])
309 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
310 # Event code should be successful logon
311 self.assertEquals(4624, msg["Authentication"]["eventId"])
312 self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"])
313 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
314 self.assertEquals("unix:", msg["Authentication"]["localAddress"])
315 self.assertEquals(self.credentials.get_username(),
316 msg["Authentication"]["clientAccount"])
317 self.assertEquals(self.credentials.get_domain(),
318 msg["Authentication"]["clientDomain"])
320 logon_id = msg["Authentication"]["logonId"]
323 # Now check the Domain server authentication message
325 message = os.read(pipe, 4096)
326 msg = json.loads(get_string(message))
327 self.assertEquals("Authentication", msg["type"])
328 self.assertEquals(logon_id, msg["Authentication"]["logonId"])
329 self.assertEquals("SamLogon",
330 msg["Authentication"]["serviceDescription"])
331 self.assertEquals("network",
332 msg["Authentication"]["authDescription"])
334 def test_wbinfo_ntlmv1(self):
335 def isLastExpectedMessage(msg):
336 DESC = "NTLM_AUTH, wbinfo"
338 msg["type"] == "Authentication" and
339 msg["Authentication"]["serviceDescription"] == "winbind" and
340 msg["Authentication"]["authDescription"] is not None and
341 msg["Authentication"]["authDescription"].startswith(DESC))
343 pipe = self.dc_watcher()
344 COMMAND = "bin/wbinfo"
346 self.check_run("{0} --ntlmv1 -a {1}%{2}".format(
348 self.credentials.get_username(),
349 self.credentials.get_password()),
350 msg="ntlm_auth failed")
351 except BlackboxProcessError:
354 messages = self.waitForMessages(isLastExpectedMessage)
355 messages = self.filter_messages(messages)
356 expected_messages = 3
357 self.assertEquals(expected_messages,
359 "Did not receive the expected number of messages")
361 # The 1st message should be an Authentication against the local
364 self.assertEquals("Authentication", msg["type"])
365 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
367 self.assertEquals("winbind",
368 msg["Authentication"]["serviceDescription"])
369 # Logon type should be Interactive
370 self.assertEquals(2, msg["Authentication"]["logonType"])
371 # Event code should be Unsuccessful logon
372 self.assertEquals(4625, msg["Authentication"]["eventId"])
373 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
374 self.assertEquals("unix:", msg["Authentication"]["localAddress"])
375 self.assertEquals('', msg["Authentication"]["clientDomain"])
376 # This is what the existing winbind implementation returns.
377 self.assertEquals("NT_STATUS_NO_SUCH_USER",
378 msg["Authentication"]["status"])
379 self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"])
380 self.assertEquals(self.credentials.get_username(),
381 msg["Authentication"]["clientAccount"])
382 self.assertEquals("", msg["Authentication"]["clientDomain"])
384 logon_id = msg["Authentication"]["logonId"]
386 # The 2nd message should be a PAM_AUTH with the same logon id as the
389 self.assertEquals("Authentication", msg["type"])
390 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
392 self.assertEquals("winbind",
393 msg["Authentication"]["serviceDescription"])
394 self.assertEquals(logon_id, msg["Authentication"]["logonId"])
395 self.assertEquals("Plaintext", msg["Authentication"]["passwordType"])
396 # Logon type should be NetworkCleartext
397 self.assertEquals(8, msg["Authentication"]["logonType"])
398 # Event code should be Unsuccessful logon
399 self.assertEquals(4625, msg["Authentication"]["eventId"])
400 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
401 self.assertEquals("unix:", msg["Authentication"]["localAddress"])
402 self.assertEquals('', msg["Authentication"]["clientDomain"])
403 # This is what the existing winbind implementation returns.
404 self.assertEquals("NT_STATUS_INVALID_HANDLE",
405 msg["Authentication"]["status"])
406 self.assertEquals(self.credentials.get_username(),
407 msg["Authentication"]["clientAccount"])
408 self.assertEquals("", msg["Authentication"]["clientDomain"])
410 # The 3rd message should be an NTLM_AUTH
412 self.assertEquals("Authentication", msg["type"])
413 self.assertTrue(msg["Authentication"]["authDescription"].startswith(
414 "NTLM_AUTH, wbinfo,"))
415 self.assertEquals("winbind",
416 msg["Authentication"]["serviceDescription"])
417 self.assertEquals("NTLMv1",
418 msg["Authentication"]["passwordType"])
419 # Logon type should be Network
420 self.assertEquals(3, msg["Authentication"]["logonType"])
421 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
422 # Event code should be successful logon
423 self.assertEquals(4624, msg["Authentication"]["eventId"])
424 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
425 self.assertEquals("unix:", msg["Authentication"]["localAddress"])
426 self.assertEquals(self.credentials.get_username(),
427 msg["Authentication"]["clientAccount"])
428 self.assertEquals(self.credentials.get_domain(),
429 msg["Authentication"]["clientDomain"])
431 logon_id = msg["Authentication"]["logonId"]
433 # Now check the Domain server authentication message
435 message = os.read(pipe, 4096)
436 msg = json.loads(get_string(message))
437 self.assertEquals("Authentication", msg["type"])
438 self.assertEquals(logon_id, msg["Authentication"]["logonId"])
439 self.assertEquals("SamLogon",
440 msg["Authentication"]["serviceDescription"])
441 self.assertEquals("network",
442 msg["Authentication"]["authDescription"])