CVE-2023-4154 dsdb: Remove remaining references to DC_MODE_RETURN_NONE and DC_MODE_RE...
[samba.git] / source4 / dsdb / tests / python / confidential_attr.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #
4 # Tests that confidential attributes (or attributes protected by a ACL that
5 # denies read access) cannot be guessed through wildcard DB searches.
6 #
7 # Copyright (C) Catalyst.Net Ltd
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22 import optparse
23 import sys
24 sys.path.insert(0, "bin/python")
25
26 import samba
27 import os
28 import random
29 import statistics
30 import time
31 from samba.tests.subunitrun import SubunitOptions, TestProgram
32 import samba.getopt as options
33 from ldb import SCOPE_BASE, SCOPE_SUBTREE
34 from samba.dsdb import SEARCH_FLAG_CONFIDENTIAL, SEARCH_FLAG_PRESERVEONDELETE
35 from ldb import Message, MessageElement, Dn
36 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials, DONT_USE_KERBEROS
41 import samba.tests
42 import samba.dsdb
43
44 parser = optparse.OptionParser("confidential_attr.py [options] <host>")
45 sambaopts = options.SambaOptions(parser)
46 parser.add_option_group(sambaopts)
47 parser.add_option_group(options.VersionOptions(parser))
48
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54
55 opts, args = parser.parse_args()
56
57 if len(args) < 1:
58     parser.print_usage()
59     sys.exit(1)
60
61 host = args[0]
62 if "://" not in host:
63     ldaphost = "ldap://%s" % host
64 else:
65     ldaphost = host
66     start = host.rindex("://")
67     host = host.lstrip(start + 3)
68
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
71 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
72
73 #
74 # Tests start here
75 #
76 class ConfidentialAttrCommon(samba.tests.TestCase):
77
78     def setUp(self):
79         super(ConfidentialAttrCommon, self).setUp()
80
81         self.ldb_admin = SamDB(ldaphost, credentials=creds,
82                                session_info=system_session(lp), lp=lp)
83         self.user_pass = "samba123@"
84         self.base_dn = self.ldb_admin.domain_dn()
85         self.schema_dn = self.ldb_admin.get_schema_basedn()
86         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
87
88         # the tests work by setting the 'Confidential' bit in the searchFlags
89         # for an existing schema attribute. This only works against Windows if
90         # the systemFlags does not have FLAG_SCHEMA_BASE_OBJECT set for the
91         # schema attribute being modified. There are only a few attributes that
92         # meet this criteria (most of which only apply to 'user' objects)
93         self.conf_attr = "homePostalAddress"
94         attr_cn = "CN=Address-Home"
95         # schemaIdGuid for homePostalAddress (used for ACE tests)
96         self.conf_attr_guid = "16775781-47f3-11d1-a9c3-0000f80367c1"
97         self.conf_attr_sec_guid = "77b5b886-944a-11d1-aebd-0000f80367c1"
98         self.attr_dn = "{0},{1}".format(attr_cn, self.schema_dn)
99
100         userou = "OU=conf-attr-test"
101         self.ou = "{0},{1}".format(userou, self.base_dn)
102         self.ldb_admin.create_ou(self.ou)
103
104         # use a common username prefix, so we can use sAMAccountName=CATC-* as
105         # a search filter to only return the users we're interested in
106         self.user_prefix = "catc-"
107
108         # add a test object with this attribute set
109         self.conf_value = "abcdef"
110         self.conf_user = "{0}conf-user".format(self.user_prefix)
111         self.ldb_admin.newuser(self.conf_user, self.user_pass, userou=userou)
112         self.conf_dn = self.get_user_dn(self.conf_user)
113         self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
114
115         # add a sneaky user that will try to steal our secrets
116         self.user = "{0}sneaky-user".format(self.user_prefix)
117         self.ldb_admin.newuser(self.user, self.user_pass, userou=userou)
118         self.ldb_user = self.get_ldb_connection(self.user, self.user_pass)
119
120         self.all_users = [self.user, self.conf_user]
121
122         # add some other users that also have confidential attributes, so we
123         # check we don't disclose their details, particularly in '!' searches
124         for i in range(1, 3):
125             username = "{0}other-user{1}".format(self.user_prefix, i)
126             self.ldb_admin.newuser(username, self.user_pass, userou=userou)
127             userdn = self.get_user_dn(username)
128             self.add_attr(userdn, self.conf_attr, "xyz{0}".format(i))
129             self.all_users.append(username)
130
131         # there are 4 users in the OU, plus the OU itself
132         self.test_dn = self.ou
133         self.total_objects = len(self.all_users) + 1
134         self.objects_with_attr = 3
135
136         # sanity-check the flag is not already set (this'll cause problems if
137         # previous test run didn't clean up properly)
138         search_flags = self.get_attr_search_flags(self.attr_dn)
139         self.assertTrue(int(search_flags) & SEARCH_FLAG_CONFIDENTIAL == 0,
140                         "{0} searchFlags already {1}".format(self.conf_attr,
141                                                              search_flags))
142
143     def tearDown(self):
144         super(ConfidentialAttrCommon, self).tearDown()
145         self.ldb_admin.delete(self.ou, ["tree_delete:1"])
146
147     def add_attr(self, dn, attr, value):
148         m = Message()
149         m.dn = Dn(self.ldb_admin, dn)
150         m[attr] = MessageElement(value, FLAG_MOD_ADD, attr)
151         self.ldb_admin.modify(m)
152
153     def set_attr_search_flags(self, attr_dn, flags):
154         """Modifies the searchFlags for an object in the schema"""
155         m = Message()
156         m.dn = Dn(self.ldb_admin, attr_dn)
157         m['searchFlags'] = MessageElement(flags, FLAG_MOD_REPLACE,
158                                           'searchFlags')
159         self.ldb_admin.modify(m)
160
161         # note we have to update the schema for this change to take effect (on
162         # Windows, at least)
163         self.ldb_admin.set_schema_update_now()
164
165     def get_attr_search_flags(self, attr_dn):
166         """Marks the attribute under test as being confidential"""
167         res = self.ldb_admin.search(attr_dn, scope=SCOPE_BASE,
168                                     attrs=['searchFlags'])
169         return res[0]['searchFlags'][0]
170
171     def make_attr_confidential(self):
172         """Marks the attribute under test as being confidential"""
173
174         # work out the original 'searchFlags' value before we overwrite it
175         old_value = self.get_attr_search_flags(self.attr_dn)
176
177         self.set_attr_search_flags(self.attr_dn, str(SEARCH_FLAG_CONFIDENTIAL))
178
179         # reset the value after the test completes
180         self.addCleanup(self.set_attr_search_flags, self.attr_dn, old_value)
181
182     def get_user_dn(self, name):
183         return "CN={0},{1}".format(name, self.ou)
184
185     def get_user_sid_string(self, username):
186         user_dn = self.get_user_dn(username)
187         user_sid = self.sd_utils.get_object_sid(user_dn)
188         return str(user_sid)
189
190     def get_ldb_connection(self, target_username, target_password):
191         creds_tmp = Credentials()
192         creds_tmp.set_username(target_username)
193         creds_tmp.set_password(target_password)
194         creds_tmp.set_domain(creds.get_domain())
195         creds_tmp.set_realm(creds.get_realm())
196         creds_tmp.set_workstation(creds.get_workstation())
197         features = creds_tmp.get_gensec_features() | gensec.FEATURE_SEAL
198         creds_tmp.set_gensec_features(features)
199         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)
200         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
201         return ldb_target
202
203     def assert_search_result(self, expected_num, expr, samdb):
204
205         # try asking for different attributes back: None/all, the confidential
206         # attribute itself, and a random unrelated attribute
207         attr_filters = [None, ["*"], [self.conf_attr], ['name']]
208         for attr in attr_filters:
209             res = samdb.search(self.test_dn, expression=expr,
210                                scope=SCOPE_SUBTREE, attrs=attr)
211             self.assertTrue(len(res) == expected_num,
212                             "%u results, not %u for search %s, attr %s" %
213                             (len(res), expected_num, expr, str(attr)))
214
215     # return a selection of searches that match exactly against the test object
216     def get_exact_match_searches(self):
217         first_char = self.conf_value[:1]
218         last_char = self.conf_value[-1:]
219         test_attr = self.conf_attr
220
221         searches = [
222             # search for the attribute using a sub-string wildcard
223             # (which could reveal the attribute's actual value)
224             "({0}={1}*)".format(test_attr, first_char),
225             "({0}=*{1})".format(test_attr, last_char),
226
227             # sanity-check equality against an exact match on value
228             "({0}={1})".format(test_attr, self.conf_value),
229
230             # '~=' searches don't work against Samba
231             # sanity-check an approx search against an exact match on value
232             # "({0}~={1})".format(test_attr, self.conf_value),
233
234             # check wildcard in an AND search...
235             "(&({0}={1}*)(objectclass=*))".format(test_attr, first_char),
236
237             # ...an OR search (against another term that will never match)
238             "(|({0}={1}*)(objectclass=banana))".format(test_attr, first_char)]
239
240         return searches
241
242     # return searches that match any object with the attribute under test
243     def get_match_all_searches(self):
244         searches = [
245             # check a full wildcard against the confidential attribute
246             # (which could reveal the attribute's presence/absence)
247             "({0}=*)".format(self.conf_attr),
248
249             # check wildcard in an AND search...
250             "(&(objectclass=*)({0}=*))".format(self.conf_attr),
251
252             # ...an OR search (against another term that will never match)
253             "(|(objectclass=banana)({0}=*))".format(self.conf_attr),
254
255             # check <=, and >= expressions that would normally find a match
256             "({0}>=0)".format(self.conf_attr),
257             "({0}<=ZZZZZZZZZZZ)".format(self.conf_attr)]
258
259         return searches
260
261     def assert_conf_attr_searches(self, has_rights_to=0, samdb=None):
262         """Check searches against the attribute under test work as expected"""
263
264         if samdb is None:
265             samdb = self.ldb_user
266
267         if has_rights_to == "all":
268             has_rights_to = self.objects_with_attr
269
270         # these first few searches we just expect to match against the one
271         # object under test that we're trying to guess the value of
272         expected_num = 1 if has_rights_to > 0 else 0
273         for search in self.get_exact_match_searches():
274             self.assert_search_result(expected_num, search, samdb)
275
276         # these next searches will match any objects we have rights to see
277         expected_num = has_rights_to
278         for search in self.get_match_all_searches():
279             self.assert_search_result(expected_num, search, samdb)
280
281     # The following are double negative searches (i.e. NOT non-matching-
282     # condition) which will therefore match ALL objects, including the test
283     # object(s).
284     def get_negative_match_all_searches(self):
285         first_char = self.conf_value[:1]
286         last_char = self.conf_value[-1:]
287         not_first_char = chr(ord(first_char) + 1)
288         not_last_char = chr(ord(last_char) + 1)
289
290         searches = [
291             "(!({0}={1}*))".format(self.conf_attr, not_first_char),
292             "(!({0}=*{1}))".format(self.conf_attr, not_last_char)]
293         return searches
294
295     # the following searches will not match against the test object(s). So
296     # a user with sufficient rights will see an inverse sub-set of objects.
297     # (An unprivileged user would either see all objects on Windows, or no
298     # objects on Samba)
299     def get_inverse_match_searches(self):
300         first_char = self.conf_value[:1]
301         last_char = self.conf_value[-1:]
302         searches = [
303             "(!({0}={1}*))".format(self.conf_attr, first_char),
304             "(!({0}=*{1}))".format(self.conf_attr, last_char)]
305         return searches
306
307     def negative_searches_all_rights(self, total_objects=None):
308         expected_results = {}
309
310         if total_objects is None:
311             total_objects = self.total_objects
312
313         # these searches should match ALL objects (including the OU)
314         for search in self.get_negative_match_all_searches():
315             expected_results[search] = total_objects
316
317         # a ! wildcard should only match the objects without the attribute
318         search = "(!({0}=*))".format(self.conf_attr)
319         expected_results[search] = total_objects - self.objects_with_attr
320
321         # whereas the inverse searches should match all objects *except* the
322         # one under test
323         for search in self.get_inverse_match_searches():
324             expected_results[search] = total_objects - 1
325
326         return expected_results
327
328     # Returns the expected negative (i.e. '!') search behaviour when talking to
329     # a DC, i.e. we assert that users
330     # without rights always see ALL objects in '!' searches
331     def negative_searches_return_all(self, has_rights_to=0,
332                                      total_objects=None):
333         """Asserts user without rights cannot see objects in '!' searches"""
334         expected_results = {}
335
336         if total_objects is None:
337             total_objects = self.total_objects
338
339         # Windows 'hides' objects by always returning all of them, so negative
340         # searches that match all objects will simply return all objects
341         for search in self.get_negative_match_all_searches():
342             expected_results[search] = total_objects
343
344         # if we're matching on everything except the one object under test
345         # (i.e. the inverse subset), we'll still see all objects if
346         # has_rights_to == 0. Or we'll see all bar one if has_rights_to == 1.
347         inverse_searches = self.get_inverse_match_searches()
348         inverse_searches += ["(!({0}=*))".format(self.conf_attr)]
349
350         for search in inverse_searches:
351             expected_results[search] = total_objects - has_rights_to
352
353         return expected_results
354
355     # Returns the expected negative (i.e. '!') search behaviour when talking to
356     # a DC with DC_MODE_RETURN_NONE behaviour, i.e. we assert that users
357     # without rights cannot see objects in '!' searches at all
358     def negative_searches_return_none(self, has_rights_to=0):
359         expected_results = {}
360
361         # the 'match-all' searches should only return the objects we have
362         # access rights to (if any)
363         for search in self.get_negative_match_all_searches():
364             expected_results[search] = has_rights_to
365
366         # for inverse matches, we should NOT be told about any objects at all
367         inverse_searches = self.get_inverse_match_searches()
368         inverse_searches += ["(!({0}=*))".format(self.conf_attr)]
369         for search in inverse_searches:
370             expected_results[search] = 0
371
372         return expected_results
373
374     # Returns the expected negative (i.e. '!') search behaviour. This varies
375     # depending on what type of DC we're talking to (i.e. Windows or Samba)
376     # and what access rights the user has.
377     # Note we only handle has_rights_to="all", 1 (the test object), or 0 (i.e.
378     # we don't have rights to any objects)
379     def negative_search_expected_results(self, has_rights_to, total_objects=None):
380
381         if has_rights_to == "all":
382             expect_results = self.negative_searches_all_rights(total_objects)
383
384         else:
385             expect_results = self.negative_searches_return_all(has_rights_to,
386                                                                total_objects)
387         return expect_results
388
389     def assert_negative_searches(self, has_rights_to=0, samdb=None):
390         """Asserts user without rights cannot see objects in '!' searches"""
391
392         if samdb is None:
393             samdb = self.ldb_user
394
395         # build a dictionary of key=search-expr, value=expected_num assertions
396         expected_results = self.negative_search_expected_results(has_rights_to)
397
398         for search, expected_num in expected_results.items():
399             self.assert_search_result(expected_num, search, samdb)
400
401     def assert_attr_returned(self, expect_attr, samdb, attrs):
402         # does a query that should always return a successful result, and
403         # checks whether the confidential attribute is present
404         res = samdb.search(self.conf_dn, expression="(objectClass=*)",
405                            scope=SCOPE_SUBTREE, attrs=attrs)
406         self.assertTrue(len(res) == 1)
407
408         attr_returned = False
409         for msg in res:
410             if self.conf_attr in msg:
411                 attr_returned = True
412         self.assertEqual(expect_attr, attr_returned)
413
414     def assert_attr_visible(self, expect_attr, samdb=None):
415         if samdb is None:
416             samdb = self.ldb_user
417
418         # sanity-check confidential attribute is/isn't returned as expected
419         # based on the filter attributes we ask for
420         self.assert_attr_returned(expect_attr, samdb, attrs=None)
421         self.assert_attr_returned(expect_attr, samdb, attrs=["*"])
422         self.assert_attr_returned(expect_attr, samdb, attrs=[self.conf_attr])
423
424         # filtering on a different attribute should never return the conf_attr
425         self.assert_attr_returned(expect_attr=False, samdb=samdb,
426                                   attrs=['name'])
427
428     def assert_attr_visible_to_admin(self):
429         # sanity-check the admin user can always see the confidential attribute
430         self.assert_conf_attr_searches(has_rights_to="all",
431                                        samdb=self.ldb_admin)
432         self.assert_negative_searches(has_rights_to="all",
433                                       samdb=self.ldb_admin)
434         self.assert_attr_visible(expect_attr=True, samdb=self.ldb_admin)
435
436
437 class ConfidentialAttrTest(ConfidentialAttrCommon):
438     def test_basic_search(self):
439         """Basic test confidential attributes aren't disclosed via searches"""
440
441         # check we can see a non-confidential attribute in a basic searches
442         self.assert_conf_attr_searches(has_rights_to="all")
443         self.assert_negative_searches(has_rights_to="all")
444         self.assert_attr_visible(expect_attr=True)
445
446         # now make the attribute confidential. Repeat the tests and check that
447         # an ordinary user can't see the attribute, or indirectly match on the
448         # attribute via the search expression
449         self.make_attr_confidential()
450
451         self.assert_conf_attr_searches(has_rights_to=0)
452         self.assert_negative_searches(has_rights_to=0)
453         self.assert_attr_visible(expect_attr=False)
454
455         # sanity-check we haven't hidden the attribute from the admin as well
456         self.assert_attr_visible_to_admin()
457
458     def _test_search_with_allow_acl(self, allow_ace):
459         """Checks a ACE with 'CR' rights can override a confidential attr"""
460         # make the test attribute confidential and check user can't see it
461         self.make_attr_confidential()
462
463         self.assert_conf_attr_searches(has_rights_to=0)
464         self.assert_negative_searches(has_rights_to=0)
465         self.assert_attr_visible(expect_attr=False)
466
467         # apply the allow ACE to the object under test
468         self.sd_utils.dacl_add_ace(self.conf_dn, allow_ace)
469
470         # the user should now be able to see the attribute for the one object
471         # we gave it rights to
472         self.assert_conf_attr_searches(has_rights_to=1)
473         self.assert_negative_searches(has_rights_to=1)
474         self.assert_attr_visible(expect_attr=True)
475
476         # sanity-check the admin can still see the attribute
477         self.assert_attr_visible_to_admin()
478
479     def test_search_with_attr_acl_override(self):
480         """Make the confidential attr visible via an OA attr ACE"""
481
482         # set the SEC_ADS_CONTROL_ACCESS bit ('CR') for the user for the
483         # attribute under test, so the user can see it once more
484         user_sid = self.get_user_sid_string(self.user)
485         ace = "(OA;;CR;{0};;{1})".format(self.conf_attr_guid, user_sid)
486
487         self._test_search_with_allow_acl(ace)
488
489     def test_search_with_propset_acl_override(self):
490         """Make the confidential attr visible via a Property-set ACE"""
491
492         # set the SEC_ADS_CONTROL_ACCESS bit ('CR') for the user for the
493         # property-set containing the attribute under test (i.e. the
494         # attributeSecurityGuid), so the user can see it once more
495         user_sid = self.get_user_sid_string(self.user)
496         ace = "(OA;;CR;{0};;{1})".format(self.conf_attr_sec_guid, user_sid)
497
498         self._test_search_with_allow_acl(ace)
499
500     def test_search_with_acl_override(self):
501         """Make the confidential attr visible via a general 'allow' ACE"""
502
503         # set the allow SEC_ADS_CONTROL_ACCESS bit ('CR') for the user
504         user_sid = self.get_user_sid_string(self.user)
505         ace = "(A;;CR;;;{0})".format(user_sid)
506
507         self._test_search_with_allow_acl(ace)
508
509     def test_search_with_blanket_oa_acl(self):
510         """Make the confidential attr visible via a non-specific OA ACE"""
511
512         # this just checks that an Object Access (OA) ACE without a GUID
513         # specified will work the same as an 'Access' (A) ACE
514         user_sid = self.get_user_sid_string(self.user)
515         ace = "(OA;;CR;;;{0})".format(user_sid)
516
517         self._test_search_with_allow_acl(ace)
518
519     def _test_search_with_neutral_acl(self, neutral_ace):
520         """Checks that a user does NOT gain access via an unrelated ACE"""
521
522         # make the test attribute confidential and check user can't see it
523         self.make_attr_confidential()
524
525         self.assert_conf_attr_searches(has_rights_to=0)
526         self.assert_negative_searches(has_rights_to=0)
527         self.assert_attr_visible(expect_attr=False)
528
529         # apply the ACE to the object under test
530         self.sd_utils.dacl_add_ace(self.conf_dn, neutral_ace)
531
532         # this should make no difference to the user's ability to see the attr
533         self.assert_conf_attr_searches(has_rights_to=0)
534         self.assert_negative_searches(has_rights_to=0)
535         self.assert_attr_visible(expect_attr=False)
536
537         # sanity-check the admin can still see the attribute
538         self.assert_attr_visible_to_admin()
539
540     def test_search_with_neutral_acl(self):
541         """Give the user all rights *except* CR for any attributes"""
542
543         # give the user all rights *except* CR and check it makes no difference
544         user_sid = self.get_user_sid_string(self.user)
545         ace = "(A;;RPWPCCDCLCLORCWOWDSDDTSW;;;{0})".format(user_sid)
546         self._test_search_with_neutral_acl(ace)
547
548     def test_search_with_neutral_attr_acl(self):
549         """Give the user all rights *except* CR for the attribute under test"""
550
551         # giving user all OA rights *except* CR should make no difference
552         user_sid = self.get_user_sid_string(self.user)
553         rights = "RPWPCCDCLCLORCWOWDSDDTSW"
554         ace = "(OA;;{0};{1};;{2})".format(rights, self.conf_attr_guid, user_sid)
555         self._test_search_with_neutral_acl(ace)
556
557     def test_search_with_neutral_cr_acl(self):
558         """Give the user CR rights for *another* unrelated attribute"""
559
560         # giving user object-access CR rights to an unrelated attribute
561         user_sid = self.get_user_sid_string(self.user)
562         # use the GUID for sAMAccountName here (for no particular reason)
563         unrelated_attr = "3e0abfd0-126a-11d0-a060-00aa006c33ed"
564         ace = "(OA;;CR;{0};;{1})".format(unrelated_attr, user_sid)
565         self._test_search_with_neutral_acl(ace)
566
567
568 # Check that a Deny ACL on an attribute doesn't reveal confidential info
569 class ConfidentialAttrTestDenyAcl(ConfidentialAttrCommon):
570
571     def assert_not_in_result(self, res, exclude_dn):
572         for msg in res:
573             self.assertNotEqual(msg.dn, exclude_dn,
574                                 "Search revealed object {0}".format(exclude_dn))
575
576     # deny ACL tests are slightly different as we are only denying access to
577     # the one object under test (rather than any objects with that attribute).
578     # Therefore we need an extra check that we don't reveal the test object
579     # in the search, if we're not supposed to
580     def assert_search_result(self, expected_num, expr, samdb,
581                              excl_testobj=False):
582
583         # try asking for different attributes back: None/all, the confidential
584         # attribute itself, and a random unrelated attribute
585         attr_filters = [None, ["*"], [self.conf_attr], ['name']]
586         for attr in attr_filters:
587             res = samdb.search(self.test_dn, expression=expr,
588                                scope=SCOPE_SUBTREE, attrs=attr)
589             self.assertTrue(len(res) == expected_num,
590                             "%u results, not %u for search %s, attr %s" %
591                             (len(res), expected_num, expr, str(attr)))
592
593             # assert we haven't revealed the hidden test-object
594             if excl_testobj:
595                 self.assert_not_in_result(res, exclude_dn=self.conf_dn)
596
597     # we make a few tweaks to the regular version of this function to cater to
598     # denying specifically one object via an ACE
599     def assert_conf_attr_searches(self, has_rights_to=0, samdb=None):
600         """Check searches against the attribute under test work as expected"""
601
602         if samdb is None:
603             samdb = self.ldb_user
604
605         # make sure the test object is not returned if we've been denied rights
606         # to it via an ACE
607         excl_testobj = True if has_rights_to == "deny-one" else False
608
609         # these first few searches we just expect to match against the one
610         # object under test that we're trying to guess the value of
611         expected_num = 1 if has_rights_to == "all" else 0
612
613         for search in self.get_exact_match_searches():
614             self.assert_search_result(expected_num, search, samdb,
615                                       excl_testobj)
616
617         # these next searches will match any objects with the attribute that
618         # we have rights to see (i.e. all except the object under test)
619         if has_rights_to == "all":
620             expected_num = self.objects_with_attr
621         elif has_rights_to == "deny-one":
622             expected_num = self.objects_with_attr - 1
623
624         for search in self.get_match_all_searches():
625             self.assert_search_result(expected_num, search, samdb,
626                                       excl_testobj)
627
628     # override method specifically for deny ACL test cases. Instead of being
629     # granted access to either no objects or only one, we are being denied
630     # access to only one object (but can still access the rest).
631     def negative_searches_return_none(self, has_rights_to=0):
632         expected_results = {}
633
634         # on Samba we will see the objects we have rights to, but the one we
635         # are denied access to will be hidden
636         searches = self.get_negative_match_all_searches()
637         searches += self.get_inverse_match_searches()
638         for search in searches:
639             expected_results[search] = self.total_objects - 1
640
641         # The wildcard returns the objects without this attribute as normal.
642         search = "(!({0}=*))".format(self.conf_attr)
643         expected_results[search] = self.total_objects - self.objects_with_attr
644         return expected_results
645
646     # override method specifically for deny ACL test cases
647     def negative_searches_return_all(self, has_rights_to=0,
648                                      total_objects=None):
649         expected_results = {}
650
651         # When a user lacks access rights to an object, Windows 'hides' it in
652         # '!' searches by always returning it, regardless of whether it matches
653         searches = self.get_negative_match_all_searches()
654         searches += self.get_inverse_match_searches()
655         for search in searches:
656             expected_results[search] = self.total_objects
657
658         # in the wildcard case, the one object we don't have rights to gets
659         # bundled in with the objects that don't have the attribute at all
660         search = "(!({0}=*))".format(self.conf_attr)
661         has_rights_to = self.objects_with_attr - 1
662         expected_results[search] = self.total_objects - has_rights_to
663         return expected_results
664
665     # override method specifically for deny ACL test cases
666     def assert_negative_searches(self, has_rights_to=0, samdb=None):
667         """Asserts user without rights cannot see objects in '!' searches"""
668
669         if samdb is None:
670             samdb = self.ldb_user
671
672         # As the deny ACL is only denying access to one particular object, add
673         # an extra check that the denied object is not returned. (We can only
674         # assert this if the '!'/negative search behaviour is to suppress any
675         # objects we don't have access rights to)
676         excl_testobj = False
677
678         # build a dictionary of key=search-expr, value=expected_num assertions
679         expected_results = self.negative_search_expected_results(has_rights_to)
680
681         for search, expected_num in expected_results.items():
682             self.assert_search_result(expected_num, search, samdb,
683                                       excl_testobj=excl_testobj)
684
685     def _test_search_with_deny_acl(self, ace):
686         # check the user can see the attribute initially
687         self.assert_conf_attr_searches(has_rights_to="all")
688         self.assert_negative_searches(has_rights_to="all")
689         self.assert_attr_visible(expect_attr=True)
690
691         # add the ACE that denies access to the attr under test
692         self.sd_utils.dacl_add_ace(self.conf_dn, ace)
693
694         # the user shouldn't be able to see the attribute anymore
695         self.assert_conf_attr_searches(has_rights_to="deny-one")
696         self.assert_negative_searches(has_rights_to="deny-one")
697         self.assert_attr_visible(expect_attr=False)
698
699         # sanity-check we haven't hidden the attribute from the admin as well
700         self.assert_attr_visible_to_admin()
701
702     def test_search_with_deny_attr_acl(self):
703         """Checks a deny ACE works the same way as a confidential attribute"""
704
705         # add an ACE that denies the user Read Property (RP) access to the attr
706         # (which is similar to making the attribute confidential)
707         user_sid = self.get_user_sid_string(self.user)
708         ace = "(OD;;RP;{0};;{1})".format(self.conf_attr_guid, user_sid)
709
710         # check the user cannot see the attribute anymore
711         self._test_search_with_deny_acl(ace)
712
713     def test_search_with_deny_acl(self):
714         """Checks a blanket deny ACE denies access to an object's attributes"""
715
716         # add an blanket deny ACE for Read Property (RP) rights
717         user_dn = self.get_user_dn(self.user)
718         user_sid = self.sd_utils.get_object_sid(user_dn)
719         ace = "(D;;RP;;;{0})".format(str(user_sid))
720
721         # check the user cannot see the attribute anymore
722         self._test_search_with_deny_acl(ace)
723
724     def test_search_with_deny_propset_acl(self):
725         """Checks a deny ACE on the attribute's Property-Set"""
726
727         # add an blanket deny ACE for Read Property (RP) rights
728         user_sid = self.get_user_sid_string(self.user)
729         ace = "(OD;;RP;{0};;{1})".format(self.conf_attr_sec_guid, user_sid)
730
731         # check the user cannot see the attribute anymore
732         self._test_search_with_deny_acl(ace)
733
734     def test_search_with_blanket_oa_deny_acl(self):
735         """Checks a non-specific 'OD' ACE works the same as a 'D' ACE"""
736
737         # this just checks that adding a 'Object Deny' (OD) ACE without
738         # specifying a GUID will work the same way as a 'Deny' (D) ACE
739         user_sid = self.get_user_sid_string(self.user)
740         ace = "(OD;;RP;;;{0})".format(user_sid)
741
742         # check the user cannot see the attribute anymore
743         self._test_search_with_deny_acl(ace)
744
745
746 # Check that using the dirsync controls doesn't reveal confidential attributes
747 class ConfidentialAttrTestDirsync(ConfidentialAttrCommon):
748
749     def setUp(self):
750         super(ConfidentialAttrTestDirsync, self).setUp()
751         self.dirsync = ["dirsync:1:1:1000"]
752
753         # because we need to search on the base DN when using the dirsync
754         # controls, we need an extra filter for the inverse ('!') search,
755         # so we don't get thousands of objects returned
756         self.extra_filter = \
757             "(&(samaccountname={0}*)(!(isDeleted=*)))".format(self.user_prefix)
758         self.single_obj_filter = \
759             "(&(samaccountname={0})(!(isDeleted=*)))".format(self.conf_user)
760
761         self.attr_filters = [None, ["*"], ["name"]]
762
763         # Note dirsync behaviour is slighty different for the attribute under
764         # test - when you have full access rights, it only returns the objects
765         # that actually have this attribute (i.e. it doesn't return an empty
766         # message with just the DN). So we add the 'name' attribute into the
767         # attribute filter to avoid complicating our assertions further
768         self.attr_filters += [[self.conf_attr, "name"]]
769
770     # override method specifically for dirsync, i.e. add dirsync controls
771     def assert_search_result(self, expected_num, expr, samdb, base_dn=None):
772
773         # Note dirsync must always search on the partition base DN
774         base_dn = self.base_dn
775
776         # we need an extra filter for dirsync because:
777         # - we search on the base DN, so otherwise the '!' searches return
778         #   thousands of unrelated results, and
779         # - we make the test attribute preserve-on-delete in one case, so we
780         #   want to weed out results from any previous test runs
781         search = "(&{0}{1})".format(expr, self.extra_filter)
782
783         for attr in self.attr_filters:
784             res = samdb.search(base_dn, expression=search, scope=SCOPE_SUBTREE,
785                                attrs=attr, controls=self.dirsync)
786             self.assertTrue(len(res) == expected_num,
787                             "%u results, not %u for search %s, attr %s" %
788                             (len(res), expected_num, search, str(attr)))
789
790     # override method specifically for dirsync, i.e. add dirsync controls
791     def assert_attr_returned(self, expect_attr, samdb, attrs,
792                              no_result_ok=False):
793
794         # When using dirsync, the base DN we search on needs to be a naming
795         # context. Add an extra filter to ignore all the objects we aren't
796         # interested in
797         expr = self.single_obj_filter
798         res = samdb.search(self.base_dn, expression=expr, scope=SCOPE_SUBTREE,
799                            attrs=attrs, controls=self.dirsync)
800         self.assertTrue(len(res) == 1 or no_result_ok)
801
802         attr_returned = False
803         for msg in res:
804             if self.conf_attr in msg and len(msg[self.conf_attr]) > 0:
805                 attr_returned = True
806         self.assertEqual(expect_attr, attr_returned)
807
808     # override method specifically for dirsync (it has slightly different
809     # behaviour to normal when requesting specific attributes)
810     def assert_attr_visible(self, expect_attr, samdb=None):
811         if samdb is None:
812             samdb = self.ldb_user
813
814         # sanity-check confidential attribute is/isn't returned as expected
815         # based on the filter attributes we ask for
816         self.assert_attr_returned(expect_attr, samdb, attrs=None)
817         self.assert_attr_returned(expect_attr, samdb, attrs=["*"])
818
819         if expect_attr:
820             self.assert_attr_returned(expect_attr, samdb,
821                                       attrs=[self.conf_attr])
822         else:
823             # The behaviour with dirsync when asking solely for an attribute
824             # that you don't have rights to is a bit strange. Samba returns
825             # no result rather than an empty message with just the DN.
826             # Presumably this is due to dirsync module behaviour. It's not
827             # disclosive in that the DC behaves the same way as if you asked
828             # for a garbage/non-existent attribute
829             self.assert_attr_returned(expect_attr, samdb,
830                                       attrs=[self.conf_attr],
831                                       no_result_ok=True)
832             self.assert_attr_returned(expect_attr, samdb,
833                                       attrs=["garbage"], no_result_ok=True)
834
835         # filtering on a different attribute should never return the conf_attr
836         self.assert_attr_returned(expect_attr=False, samdb=samdb,
837                                   attrs=['name'])
838
839     # override method specifically for dirsync (total object count differs)
840     def assert_negative_searches(self, has_rights_to=0, samdb=None):
841         """Asserts user without rights cannot see objects in '!' searches"""
842
843         if samdb is None:
844             samdb = self.ldb_user
845
846         # because dirsync uses an extra filter, the total objects we expect
847         # here only includes the user objects (not the parent OU)
848         total_objects = len(self.all_users)
849         expected_results = self.negative_search_expected_results(has_rights_to,
850                                                                  total_objects)
851
852         for search, expected_num in expected_results.items():
853             self.assert_search_result(expected_num, search, samdb)
854
855     def test_search_with_dirsync(self):
856         """Checks dirsync controls don't reveal confidential attributes"""
857
858         self.assert_conf_attr_searches(has_rights_to="all")
859         self.assert_attr_visible(expect_attr=True)
860         self.assert_negative_searches(has_rights_to="all")
861
862         # make the test attribute confidential and check user can't see it,
863         # even if they use the dirsync controls
864         self.make_attr_confidential()
865
866         self.assert_conf_attr_searches(has_rights_to=0)
867         self.assert_attr_visible(expect_attr=False)
868         self.assert_negative_searches(has_rights_to=0)
869
870         # as a final sanity-check, make sure the admin can still see the attr
871         self.assert_conf_attr_searches(has_rights_to="all",
872                                        samdb=self.ldb_admin)
873         self.assert_attr_visible(expect_attr=True, samdb=self.ldb_admin)
874         self.assert_negative_searches(has_rights_to="all",
875                                       samdb=self.ldb_admin)
876
877     def get_guid_string(self, dn):
878         """Returns an object's GUID (in string format)"""
879         res = self.ldb_admin.search(base=dn, attrs=["objectGUID"],
880                                     scope=SCOPE_BASE)
881         guid = res[0]['objectGUID'][0]
882         return self.ldb_admin.schema_format_value("objectGUID", guid).decode('utf-8')
883
884     def make_attr_preserve_on_delete(self):
885         """Marks the attribute under test as being preserve on delete"""
886
887         # work out the original 'searchFlags' value before we overwrite it
888         search_flags = int(self.get_attr_search_flags(self.attr_dn))
889
890         # check we've already set the confidential flag
891         self.assertTrue(search_flags & SEARCH_FLAG_CONFIDENTIAL != 0)
892         search_flags |= SEARCH_FLAG_PRESERVEONDELETE
893
894         self.set_attr_search_flags(self.attr_dn, str(search_flags))
895
896     def change_attr_under_test(self, attr_name, attr_cn):
897         # change the attribute that the test code uses
898         self.conf_attr = attr_name
899         self.attr_dn = "{0},{1}".format(attr_cn, self.schema_dn)
900
901         # set the new attribute for the user-under-test
902         self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
903
904         # 2 other users also have the attribute-under-test set (to a randomish
905         # value). Set the new attribute for them now (normally this gets done
906         # in the setUp())
907         for username in self.all_users:
908             if "other-user" in username:
909                 dn = self.get_user_dn(username)
910                 self.add_attr(dn, self.conf_attr, "xyz-blah")
911
912     def test_search_with_dirsync_deleted_objects(self):
913         """Checks dirsync doesn't reveal confidential info for deleted objs"""
914
915         # change the attribute we're testing (we'll preserve on delete for this
916         # test case, which means the attribute-under-test hangs around after
917         # the test case finishes, and would interfere with the searches for
918         # subsequent other test cases)
919         self.change_attr_under_test("carLicense", "CN=carLicense")
920
921         # Windows dirsync behaviour is a little strange when you request
922         # attributes that deleted objects no longer have, so just request 'all
923         # attributes' to simplify the test logic
924         self.attr_filters = [None, ["*"]]
925
926         # normally dirsync uses extra filters to exclude deleted objects that
927         # we're not interested in. Override these filters so they WILL include
928         # deleted objects, but only from this particular test run. We can do
929         # this by matching lastKnownParent against this test case's OU, which
930         # will match any deleted child objects.
931         ou_guid = self.get_guid_string(self.ou)
932         deleted_filter = "(lastKnownParent=<GUID={0}>)".format(ou_guid)
933
934         # the extra-filter will get combined via AND with the search expression
935         # we're testing, i.e. filter on the confidential attribute AND only
936         # include non-deleted objects, OR deleted objects from this test run
937         exclude_deleted_objs_filter = self.extra_filter
938         self.extra_filter = "(|{0}{1})".format(exclude_deleted_objs_filter,
939                                                deleted_filter)
940
941         # for matching on a single object, the search expresseion becomes:
942         # match exactly by account-name AND either a non-deleted object OR a
943         # deleted object from this test run
944         match_by_name = "(samaccountname={0})".format(self.conf_user)
945         not_deleted = "(!(isDeleted=*))"
946         self.single_obj_filter = "(&{0}(|{1}{2}))".format(match_by_name,
947                                                           not_deleted,
948                                                           deleted_filter)
949
950         # check that the search filters work as expected
951         self.assert_conf_attr_searches(has_rights_to="all")
952         self.assert_attr_visible(expect_attr=True)
953         self.assert_negative_searches(has_rights_to="all")
954
955         # make the test attribute confidential *and* preserve on delete.
956         self.make_attr_confidential()
957         self.make_attr_preserve_on_delete()
958
959         # check we can't see the objects now, even with using dirsync controls
960         self.assert_conf_attr_searches(has_rights_to=0)
961         self.assert_attr_visible(expect_attr=False)
962         self.assert_negative_searches(has_rights_to=0)
963
964         # now delete the users (except for the user whose LDB connection
965         # we're currently using)
966         for user in self.all_users:
967             if user != self.user:
968                 self.ldb_admin.delete(self.get_user_dn(user))
969
970         # check we still can't see the objects
971         self.assert_conf_attr_searches(has_rights_to=0)
972         self.assert_negative_searches(has_rights_to=0)
973
974     def test_timing_attack(self):
975         # Create the machine account.
976         mach_name = f'conf_timing_{random.randint(0, 0xffff)}'
977         mach_dn = Dn(self.ldb_admin, f'CN={mach_name},{self.ou}')
978         details = {
979             'dn': mach_dn,
980             'objectclass': 'computer',
981             'sAMAccountName': f'{mach_name}$',
982         }
983         self.ldb_admin.add(details)
984
985         # Get the machine account's GUID.
986         res = self.ldb_admin.search(mach_dn,
987                                     attrs=['objectGUID'],
988                                     scope=SCOPE_BASE)
989         mach_guid = res[0].get('objectGUID', idx=0)
990
991         # Now we can create an msFVE-RecoveryInformation object that is a child
992         # of the machine account object.
993         recovery_dn = Dn(self.ldb_admin, str(mach_dn))
994         recovery_dn.add_child('CN=recovery_info')
995
996         secret_pw = 'Secret007'
997         not_secret_pw = 'Secret008'
998
999         secret_pw_utf8 = secret_pw.encode('utf-8')
1000
1001         # The crucial attribute, msFVE-RecoveryPassword, is a confidential
1002         # attribute.
1003         conf_attr = 'msFVE-RecoveryPassword'
1004
1005         m = Message(recovery_dn)
1006         m['objectClass'] = 'msFVE-RecoveryInformation'
1007         m['msFVE-RecoveryGuid'] = mach_guid
1008         m[conf_attr] = secret_pw
1009         self.ldb_admin.add(m)
1010
1011         attrs = [conf_attr]
1012
1013         # Search for the confidential attribute as administrator, ensuring it
1014         # is visible.
1015         res = self.ldb_admin.search(recovery_dn,
1016                                     attrs=attrs,
1017                                     scope=SCOPE_BASE)
1018         self.assertEqual(1, len(res))
1019         pw = res[0].get(conf_attr, idx=0)
1020         self.assertEqual(secret_pw_utf8, pw)
1021
1022         # Repeat the search with an expression matching on the confidential
1023         # attribute. This should also work.
1024         res = self.ldb_admin.search(
1025             recovery_dn,
1026             attrs=attrs,
1027             expression=f'({conf_attr}={secret_pw})',
1028             scope=SCOPE_BASE)
1029         self.assertEqual(1, len(res))
1030         pw = res[0].get(conf_attr, idx=0)
1031         self.assertEqual(secret_pw_utf8, pw)
1032
1033         # Search for the attribute as an unprivileged user. It should not be
1034         # visible.
1035         user_res = self.ldb_user.search(recovery_dn,
1036                                         attrs=attrs,
1037                                         scope=SCOPE_BASE)
1038         pw = user_res[0].get(conf_attr, idx=0)
1039         # The attribute should be None.
1040         self.assertIsNone(pw)
1041
1042         # We use LDAP_MATCHING_RULE_TRANSITIVE_EVAL to create a search
1043         # expression that takes a long time to execute, by setting off another
1044         # search each time it is evaluated. It makes no difference that the
1045         # object on which we're searching has no 'member' attribute.
1046         dummy_dn = 'cn=user,cn=users,dc=samba,dc=example,dc=com'
1047         slow_subexpr = f'(member:1.2.840.113556.1.4.1941:={dummy_dn})'
1048         slow_expr = f'(|{slow_subexpr * 100})'
1049
1050         # The full search expression. It comprises a match on the confidential
1051         # attribute joined by an AND to our slow search expression, The AND
1052         # operator is short-circuiting, so if our first subexpression fails to
1053         # match, we'll bail out of the search early. Otherwise, we'll evaluate
1054         # the slow part; as its subexpressions are joined by ORs, and will all
1055         # fail to match, every one of them will need to be evaluated. By
1056         # measuring how long the search takes, we'll be able to infer whether
1057         # the confidential attribute matched or not.
1058
1059         # This is bad if we are not an administrator, and are able to use this
1060         # to determine the values of confidential attributes. Therefore we need
1061         # to ensure we can't observe any difference in timing.
1062         correct_expr = f'(&({conf_attr}={secret_pw}){slow_expr})'
1063         wrong_expr = f'(&({conf_attr}={not_secret_pw}){slow_expr})'
1064
1065         def standard_uncertainty_bounds(times):
1066             mean = statistics.mean(times)
1067             stdev = statistics.stdev(times, mean)
1068
1069             return (mean - stdev, mean + stdev)
1070
1071         # Perform a number of searches with both correct and incorrect
1072         # expressions, and return the uncertainty bounds for each.
1073         def time_searches(samdb):
1074             warmup_samples = 3
1075             samples = 10
1076             matching_times = []
1077             non_matching_times = []
1078
1079             for _ in range(warmup_samples):
1080                 samdb.search(recovery_dn,
1081                              attrs=attrs,
1082                              expression=correct_expr,
1083                              scope=SCOPE_BASE)
1084
1085             for _ in range(samples):
1086                 # Measure the time taken for a search, for both a matching and
1087                 # a non-matching search expression.
1088
1089                 prev = time.time()
1090                 samdb.search(recovery_dn,
1091                              attrs=attrs,
1092                              expression=correct_expr,
1093                              scope=SCOPE_BASE)
1094                 now = time.time()
1095                 matching_times.append(now - prev)
1096
1097                 prev = time.time()
1098                 samdb.search(recovery_dn,
1099                              attrs=attrs,
1100                              expression=wrong_expr,
1101                              scope=SCOPE_BASE)
1102                 now = time.time()
1103                 non_matching_times.append(now - prev)
1104
1105             matching = standard_uncertainty_bounds(matching_times)
1106             non_matching = standard_uncertainty_bounds(non_matching_times)
1107             return matching, non_matching
1108
1109         def assertRangesDistinct(a, b):
1110             a0, a1 = a
1111             b0, b1 = b
1112             self.assertLess(min(a1, b1), max(a0, b0))
1113
1114         def assertRangesOverlap(a, b):
1115             a0, a1 = a
1116             b0, b1 = b
1117             self.assertGreaterEqual(min(a1, b1), max(a0, b0))
1118
1119         # For an administrator, the uncertainty bounds for matching and
1120         # non-matching searches should be distinct. This shows that the two
1121         # cases are distinguishable, and therefore that confidential attributes
1122         # are visible.
1123         admin_matching, admin_non_matching = time_searches(self.ldb_admin)
1124         assertRangesDistinct(admin_matching, admin_non_matching)
1125
1126         # The user cannot view the confidential attribute, so the uncertainty
1127         # bounds for matching and non-matching searches must overlap. The two
1128         # cases must be indistinguishable.
1129         user_matching, user_non_matching = time_searches(self.ldb_user)
1130         assertRangesOverlap(user_matching, user_non_matching)
1131
1132
1133 TestProgram(module=__name__, opts=subunitopts)