pyldb: avoid segfault when adding an element with no name
[sfrench/samba-autobuild/.git] / python / samba / tests / auth_log_winbind.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """
19     auth logging tests that exercise winbind
20 """
21
22 import json
23 import os
24 from random import SystemRandom
25 import string
26 import time
27
28 from samba.auth import system_session
29 from samba.credentials import Credentials
30 from samba.compat import get_string, get_bytes
31 from samba.dcerpc.messaging import AUTH_EVENT_NAME, MSG_AUTH_LOG
32 from samba.dsdb import UF_NORMAL_ACCOUNT
33 from samba.messaging import Messaging
34 from samba.param import LoadParm
35 from samba.samdb import SamDB
36 from samba.tests import delete_force, BlackboxProcessError, BlackboxTestCase
37 from samba.tests.auth_log_base import AuthLogTestBase
38
39 USER_NAME = "WBALU"
40
41
42 class AuthLogTestsWinbind(AuthLogTestBase, BlackboxTestCase):
43
44     #
45     # Helper function to watch for authentication messages on the
46     # Domain Controller.
47     #
48     def dc_watcher(self):
49
50         (r1, w1) = os.pipe()
51         pid = os.fork()
52         if pid != 0:
53             # Parent process return the result socket to the caller.
54             return r1
55
56         # Load the lp context for the Domain Controller, rather than the
57         # member server.
58         config_file = os.environ["DC_SERVERCONFFILE"]
59         lp_ctx = LoadParm()
60         lp_ctx.load(config_file)
61
62         #
63         # Is the message a SamLogon authentication?
64         def is_sam_logon(m):
65             if m is None:
66                 return False
67             msg = json.loads(m)
68             return (
69                 msg["type"] == "Authentication" and
70                 msg["Authentication"]["serviceDescription"] == "SamLogon" and
71                 msg["Authentication"]["authDescription"] == "interactive")
72
73         #
74         # Handler function for received authentication messages.
75         def message_handler(context, msgType, src, message):
76             # Print the message to help debugging the tests.
77             # as it's a JSON message it does not look like a sub-unit message.
78             print(message)
79             self.dc_msg = message
80
81         # Set up a messaging context to listen for authentication events on
82         # the domain controller.
83         msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
84         msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
85         msg_handler_and_context = (message_handler, None)
86         msg_ctx.register(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
87
88         # Wait for the SamLogon message.
89         # As the SamLogon is the only message we're interested ignore any other
90         # messages.
91         self.dc_msg = None
92         start_time = time.time()
93         while not is_sam_logon(self.dc_msg) and (time.time() - start_time < 1):
94             msg_ctx.loop_once(0.1)
95
96         if self.dc_msg is None:
97             os.write(w1, get_bytes("None"))
98         else:
99             os.write(w1, get_bytes(self.dc_msg))
100
101         msg_ctx.deregister(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
102         msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
103
104         os._exit(0)
105
106     # Remove any DCE/RPC ncacn_np messages
107     # these only get triggered once per session, and stripping them out
108     # avoids ordering dependencies in the tests
109     #
110     def filter_messages(self, messages):
111         def keep(msg):
112             if (msg["type"] == "Authorization" and
113                 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
114                 msg["Authorization"]["authType"] == "ncacn_np"):
115                     return False
116             else:
117                 return True
118
119         return list(filter(keep, messages))
120
121     def setUp(self):
122         super(AuthLogTestsWinbind, self).setUp()
123         self.domain = os.environ["DOMAIN"]
124         self.host = os.environ["SERVER"]
125         self.dc = os.environ["DC_SERVER"]
126         self.lp = self.get_loadparm()
127         self.credentials = self.get_credentials()
128         self.session = system_session()
129
130         self.ldb = SamDB(
131             url="ldap://{0}".format(self.dc),
132             session_info=self.session,
133             credentials=self.credentials,
134             lp=self.lp)
135         self.create_user_account()
136
137     def tearDown(self):
138         super(AuthLogTestsWinbind, self).tearDown()
139         delete_force(self.ldb, self.user_dn)
140
141     #
142     # Create a test user account
143     def create_user_account(self):
144         self.user_pass = self.random_password()
145         self.user_name = USER_NAME
146         self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
147
148         # remove the account if it exists, this will happen if a previous test
149         # run failed
150         delete_force(self.ldb, self.user_dn)
151
152         utf16pw = ('"%s"' % get_string(self.user_pass)).encode('utf-16-le')
153         self.ldb.add({
154            "dn": self.user_dn,
155            "objectclass": "user",
156            "sAMAccountName": "%s" % self.user_name,
157            "userAccountControl": str(UF_NORMAL_ACCOUNT),
158            "unicodePwd": utf16pw})
159
160         self.user_creds = Credentials()
161         self.user_creds.guess(self.get_loadparm())
162         self.user_creds.set_password(self.user_pass)
163         self.user_creds.set_username(self.user_name)
164         self.user_creds.set_workstation(self.server)
165
166     def test_ntlm_auth(self):
167
168         def isLastExpectedMessage(msg):
169             DESC = "PAM_AUTH, ntlm_auth"
170             return (
171                 msg["type"] == "Authentication" and
172                 msg["Authentication"]["serviceDescription"] == "winbind" and
173                 msg["Authentication"]["authDescription"] is not None and
174                 msg["Authentication"]["authDescription"].startswith(DESC))
175
176         pipe = self.dc_watcher()
177         COMMAND = "bin/ntlm_auth"
178         self.check_run("{0} --username={1} --password={2}".format(
179             COMMAND,
180             self.credentials.get_username(),
181             self.credentials.get_password()),
182             msg="ntlm_auth failed")
183
184         messages = self.waitForMessages(isLastExpectedMessage)
185         messages = self.filter_messages(messages)
186         expected_messages = 1
187         self.assertEquals(expected_messages,
188                           len(messages),
189                           "Did not receive the expected number of messages")
190
191         # Check the first message it should be an Authentication
192         msg = messages[0]
193         self.assertEquals("Authentication", msg["type"])
194         self.assertTrue(
195             msg["Authentication"]["authDescription"].startswith(
196                 "PAM_AUTH, ntlm_auth,"))
197         self.assertEquals("winbind",
198                           msg["Authentication"]["serviceDescription"])
199         self.assertEquals("Plaintext", msg["Authentication"]["passwordType"])
200         # Logon type should be NetworkCleartext
201         self.assertEquals(8, msg["Authentication"]["logonType"])
202         # Event code should be Successful logon
203         self.assertEquals(4624, msg["Authentication"]["eventId"])
204         self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
205         self.assertEquals("unix:", msg["Authentication"]["localAddress"])
206         self.assertEquals(self.domain, msg["Authentication"]["clientDomain"])
207         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
208         self.assertEquals(self.credentials.get_username(),
209                           msg["Authentication"]["clientAccount"])
210         self.assertEquals(self.credentials.get_domain(),
211                           msg["Authentication"]["clientDomain"])
212         self.assertTrue(msg["Authentication"]["workstation"] is None)
213
214         logon_id = msg["Authentication"]["logonId"]
215
216         message = os.read(pipe, 4096)
217         msg = json.loads(get_string(message))
218         self.assertEquals("Authentication", msg["type"])
219         self.assertEquals(logon_id, msg["Authentication"]["logonId"])
220         self.assertEquals("SamLogon",
221                           msg["Authentication"]["serviceDescription"])
222         self.assertEquals("interactive",
223                           msg["Authentication"]["authDescription"])
224
225     def test_wbinfo(self):
226         def isLastExpectedMessage(msg):
227             DESC = "NTLM_AUTH, wbinfo"
228             return (
229                 msg["type"] == "Authentication" and
230                 msg["Authentication"]["serviceDescription"] == "winbind" and
231                 msg["Authentication"]["authDescription"] is not None and
232                 msg["Authentication"]["authDescription"].startswith(DESC))
233
234         pipe = self.dc_watcher()
235         COMMAND = "bin/wbinfo"
236         try:
237             self.check_run("{0} -a {1}%{2}".format(
238                 COMMAND,
239                 self.credentials.get_username(),
240                 self.credentials.get_password()),
241                 msg="ntlm_auth failed")
242         except BlackboxProcessError:
243             pass
244
245         messages = self.waitForMessages(isLastExpectedMessage)
246         messages = self.filter_messages(messages)
247         expected_messages = 3
248         self.assertEquals(expected_messages,
249                           len(messages),
250                           "Did not receive the expected number of messages")
251
252         # The 1st message should be an Authentication against the local
253         # password database
254         msg = messages[0]
255         self.assertEquals("Authentication", msg["type"])
256         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
257             "PASSDB, wbinfo,"))
258         self.assertEquals("winbind",
259                           msg["Authentication"]["serviceDescription"])
260         # Logon type should be Interactive
261         self.assertEquals(2, msg["Authentication"]["logonType"])
262         # Event code should be Unsuccessful logon
263         self.assertEquals(4625, msg["Authentication"]["eventId"])
264         self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
265         self.assertEquals("unix:", msg["Authentication"]["localAddress"])
266         self.assertEquals('', msg["Authentication"]["clientDomain"])
267         # This is what the existing winbind implementation returns.
268         self.assertEquals("NT_STATUS_NO_SUCH_USER",
269                           msg["Authentication"]["status"])
270         self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"])
271         self.assertEquals(self.credentials.get_username(),
272                           msg["Authentication"]["clientAccount"])
273         self.assertEquals("", msg["Authentication"]["clientDomain"])
274
275         logon_id = msg["Authentication"]["logonId"]
276
277         # The 2nd message should be a PAM_AUTH with the same logon id as the
278         # 1st message
279         msg = messages[1]
280         self.assertEquals("Authentication", msg["type"])
281         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
282             "PAM_AUTH"))
283         self.assertEquals("winbind",
284                           msg["Authentication"]["serviceDescription"])
285         self.assertEquals(logon_id, msg["Authentication"]["logonId"])
286         # Logon type should be NetworkCleartext
287         self.assertEquals(8, msg["Authentication"]["logonType"])
288         # Event code should be Unsuccessful logon
289         self.assertEquals(4625, msg["Authentication"]["eventId"])
290         self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
291         self.assertEquals("unix:", msg["Authentication"]["localAddress"])
292         self.assertEquals('', msg["Authentication"]["clientDomain"])
293         # This is what the existing winbind implementation returns.
294         self.assertEquals("NT_STATUS_INVALID_HANDLE",
295                           msg["Authentication"]["status"])
296         self.assertEquals(self.credentials.get_username(),
297                           msg["Authentication"]["clientAccount"])
298         self.assertEquals("", msg["Authentication"]["clientDomain"])
299
300         # The 3rd message should be an NTLM_AUTH
301         msg = messages[2]
302         self.assertEquals("Authentication", msg["type"])
303         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
304             "NTLM_AUTH, wbinfo,"))
305         self.assertEquals("winbind",
306                           msg["Authentication"]["serviceDescription"])
307         # Logon type should be Network
308         self.assertEquals(3, msg["Authentication"]["logonType"])
309         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
310         # Event code should be successful logon
311         self.assertEquals(4624, msg["Authentication"]["eventId"])
312         self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"])
313         self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
314         self.assertEquals("unix:", msg["Authentication"]["localAddress"])
315         self.assertEquals(self.credentials.get_username(),
316                           msg["Authentication"]["clientAccount"])
317         self.assertEquals(self.credentials.get_domain(),
318                           msg["Authentication"]["clientDomain"])
319
320         logon_id = msg["Authentication"]["logonId"]
321
322         #
323         # Now check the Domain server authentication message
324         #
325         message = os.read(pipe, 4096)
326         msg = json.loads(get_string(message))
327         self.assertEquals("Authentication", msg["type"])
328         self.assertEquals(logon_id, msg["Authentication"]["logonId"])
329         self.assertEquals("SamLogon",
330                           msg["Authentication"]["serviceDescription"])
331         self.assertEquals("network",
332                           msg["Authentication"]["authDescription"])
333
334     def test_wbinfo_ntlmv1(self):
335         def isLastExpectedMessage(msg):
336             DESC = "NTLM_AUTH, wbinfo"
337             return (
338                 msg["type"] == "Authentication" and
339                 msg["Authentication"]["serviceDescription"] == "winbind" and
340                 msg["Authentication"]["authDescription"] is not None and
341                 msg["Authentication"]["authDescription"].startswith(DESC))
342
343         pipe = self.dc_watcher()
344         COMMAND = "bin/wbinfo"
345         try:
346             self.check_run("{0} --ntlmv1 -a {1}%{2}".format(
347                 COMMAND,
348                 self.credentials.get_username(),
349                 self.credentials.get_password()),
350                 msg="ntlm_auth failed")
351         except BlackboxProcessError:
352             pass
353
354         messages = self.waitForMessages(isLastExpectedMessage)
355         messages = self.filter_messages(messages)
356         expected_messages = 3
357         self.assertEquals(expected_messages,
358                           len(messages),
359                           "Did not receive the expected number of messages")
360
361         # The 1st message should be an Authentication against the local
362         # password database
363         msg = messages[0]
364         self.assertEquals("Authentication", msg["type"])
365         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
366             "PASSDB, wbinfo,"))
367         self.assertEquals("winbind",
368                           msg["Authentication"]["serviceDescription"])
369         # Logon type should be Interactive
370         self.assertEquals(2, msg["Authentication"]["logonType"])
371         # Event code should be Unsuccessful logon
372         self.assertEquals(4625, msg["Authentication"]["eventId"])
373         self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
374         self.assertEquals("unix:", msg["Authentication"]["localAddress"])
375         self.assertEquals('', msg["Authentication"]["clientDomain"])
376         # This is what the existing winbind implementation returns.
377         self.assertEquals("NT_STATUS_NO_SUCH_USER",
378                           msg["Authentication"]["status"])
379         self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"])
380         self.assertEquals(self.credentials.get_username(),
381                           msg["Authentication"]["clientAccount"])
382         self.assertEquals("", msg["Authentication"]["clientDomain"])
383
384         logon_id = msg["Authentication"]["logonId"]
385
386         # The 2nd message should be a PAM_AUTH with the same logon id as the
387         # 1st message
388         msg = messages[1]
389         self.assertEquals("Authentication", msg["type"])
390         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
391             "PAM_AUTH"))
392         self.assertEquals("winbind",
393                           msg["Authentication"]["serviceDescription"])
394         self.assertEquals(logon_id, msg["Authentication"]["logonId"])
395         self.assertEquals("Plaintext", msg["Authentication"]["passwordType"])
396         # Logon type should be NetworkCleartext
397         self.assertEquals(8, msg["Authentication"]["logonType"])
398         # Event code should be Unsuccessful logon
399         self.assertEquals(4625, msg["Authentication"]["eventId"])
400         self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
401         self.assertEquals("unix:", msg["Authentication"]["localAddress"])
402         self.assertEquals('', msg["Authentication"]["clientDomain"])
403         # This is what the existing winbind implementation returns.
404         self.assertEquals("NT_STATUS_INVALID_HANDLE",
405                           msg["Authentication"]["status"])
406         self.assertEquals(self.credentials.get_username(),
407                           msg["Authentication"]["clientAccount"])
408         self.assertEquals("", msg["Authentication"]["clientDomain"])
409
410         # The 3rd message should be an NTLM_AUTH
411         msg = messages[2]
412         self.assertEquals("Authentication", msg["type"])
413         self.assertTrue(msg["Authentication"]["authDescription"].startswith(
414             "NTLM_AUTH, wbinfo,"))
415         self.assertEquals("winbind",
416                           msg["Authentication"]["serviceDescription"])
417         self.assertEquals("NTLMv1",
418                           msg["Authentication"]["passwordType"])
419         # Logon type should be Network
420         self.assertEquals(3, msg["Authentication"]["logonType"])
421         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
422         # Event code should be successful logon
423         self.assertEquals(4624, msg["Authentication"]["eventId"])
424         self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
425         self.assertEquals("unix:", msg["Authentication"]["localAddress"])
426         self.assertEquals(self.credentials.get_username(),
427                           msg["Authentication"]["clientAccount"])
428         self.assertEquals(self.credentials.get_domain(),
429                           msg["Authentication"]["clientDomain"])
430
431         logon_id = msg["Authentication"]["logonId"]
432         #
433         # Now check the Domain server authentication message
434         #
435         message = os.read(pipe, 4096)
436         msg = json.loads(get_string(message))
437         self.assertEquals("Authentication", msg["type"])
438         self.assertEquals(logon_id, msg["Authentication"]["logonId"])
439         self.assertEquals("SamLogon",
440                           msg["Authentication"]["serviceDescription"])
441         self.assertEquals("network",
442                           msg["Authentication"]["authDescription"])