eb75da6374fe2cfe04e86c54c8c20d89a1301215
[samba.git] / source4 / dsdb / tests / python / confidential_attr.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #
4 # Tests that confidential attributes (or attributes protected by a ACL that
5 # denies read access) cannot be guessed through wildcard DB searches.
6 #
7 # Copyright (C) Catalyst.Net Ltd
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22 import optparse
23 import sys
24 sys.path.insert(0, "bin/python")
25
26 import samba
27 import random
28 import statistics
29 import time
30 from samba.tests.subunitrun import SubunitOptions, TestProgram
31 import samba.getopt as options
32 from ldb import SCOPE_BASE, SCOPE_SUBTREE
33 from samba.dsdb import SEARCH_FLAG_CONFIDENTIAL, SEARCH_FLAG_PRESERVEONDELETE
34 from ldb import Message, MessageElement, Dn
35 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD
36 from samba.auth import system_session
37 from samba import gensec, sd_utils
38 from samba.samdb import SamDB
39 from samba.credentials import Credentials, DONT_USE_KERBEROS
40 import samba.tests
41 import samba.dsdb
42
43 parser = optparse.OptionParser("confidential_attr.py [options] <host>")
44 sambaopts = options.SambaOptions(parser)
45 parser.add_option_group(sambaopts)
46 parser.add_option_group(options.VersionOptions(parser))
47
48 # use command line creds if available
49 credopts = options.CredentialsOptions(parser)
50 parser.add_option_group(credopts)
51 subunitopts = SubunitOptions(parser)
52 parser.add_option_group(subunitopts)
53
54 opts, args = parser.parse_args()
55
56 if len(args) < 1:
57     parser.print_usage()
58     sys.exit(1)
59
60 host = args[0]
61 if "://" not in host:
62     ldaphost = "ldap://%s" % host
63 else:
64     ldaphost = host
65     start = host.rindex("://")
66     host = host.lstrip(start + 3)
67
68 lp = sambaopts.get_loadparm()
69 creds = credopts.get_credentials(lp)
70 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
71
72 #
73 # Tests start here
74 #
75 class ConfidentialAttrCommon(samba.tests.TestCase):
76
77     def setUp(self):
78         super(ConfidentialAttrCommon, self).setUp()
79
80         self.ldb_admin = SamDB(ldaphost, credentials=creds,
81                                session_info=system_session(lp), lp=lp)
82         self.user_pass = "samba123@"
83         self.base_dn = self.ldb_admin.domain_dn()
84         self.schema_dn = self.ldb_admin.get_schema_basedn()
85         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
86
87         # the tests work by setting the 'Confidential' bit in the searchFlags
88         # for an existing schema attribute. This only works against Windows if
89         # the systemFlags does not have FLAG_SCHEMA_BASE_OBJECT set for the
90         # schema attribute being modified. There are only a few attributes that
91         # meet this criteria (most of which only apply to 'user' objects)
92         self.conf_attr = "homePostalAddress"
93         attr_cn = "CN=Address-Home"
94         # schemaIdGuid for homePostalAddress (used for ACE tests)
95         self.conf_attr_guid = "16775781-47f3-11d1-a9c3-0000f80367c1"
96         self.conf_attr_sec_guid = "77b5b886-944a-11d1-aebd-0000f80367c1"
97         self.attr_dn = "{0},{1}".format(attr_cn, self.schema_dn)
98
99         userou = "OU=conf-attr-test"
100         self.ou = "{0},{1}".format(userou, self.base_dn)
101         self.ldb_admin.create_ou(self.ou)
102
103         # use a common username prefix, so we can use sAMAccountName=CATC-* as
104         # a search filter to only return the users we're interested in
105         self.user_prefix = "catc-"
106
107         # add a test object with this attribute set
108         self.conf_value = "abcdef"
109         self.conf_user = "{0}conf-user".format(self.user_prefix)
110         self.ldb_admin.newuser(self.conf_user, self.user_pass, userou=userou)
111         self.conf_dn = self.get_user_dn(self.conf_user)
112         self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
113
114         # add a sneaky user that will try to steal our secrets
115         self.user = "{0}sneaky-user".format(self.user_prefix)
116         self.ldb_admin.newuser(self.user, self.user_pass, userou=userou)
117         self.ldb_user = self.get_ldb_connection(self.user, self.user_pass)
118
119         self.all_users = [self.user, self.conf_user]
120
121         # add some other users that also have confidential attributes, so we
122         # check we don't disclose their details, particularly in '!' searches
123         for i in range(1, 3):
124             username = "{0}other-user{1}".format(self.user_prefix, i)
125             self.ldb_admin.newuser(username, self.user_pass, userou=userou)
126             userdn = self.get_user_dn(username)
127             self.add_attr(userdn, self.conf_attr, "xyz{0}".format(i))
128             self.all_users.append(username)
129
130         # there are 4 users in the OU, plus the OU itself
131         self.test_dn = self.ou
132         self.total_objects = len(self.all_users) + 1
133         self.objects_with_attr = 3
134
135         # sanity-check the flag is not already set (this'll cause problems if
136         # previous test run didn't clean up properly)
137         search_flags = self.get_attr_search_flags(self.attr_dn)
138         self.assertEqual(0, int(search_flags) & SEARCH_FLAG_CONFIDENTIAL,
139                          "{0} searchFlags already {1}".format(self.conf_attr,
140                                                               search_flags))
141
142     def tearDown(self):
143         super(ConfidentialAttrCommon, self).tearDown()
144         self.ldb_admin.delete(self.ou, ["tree_delete:1"])
145
146     def add_attr(self, dn, attr, value):
147         m = Message()
148         m.dn = Dn(self.ldb_admin, dn)
149         m[attr] = MessageElement(value, FLAG_MOD_ADD, attr)
150         self.ldb_admin.modify(m)
151
152     def set_attr_search_flags(self, attr_dn, flags):
153         """Modifies the searchFlags for an object in the schema"""
154         m = Message()
155         m.dn = Dn(self.ldb_admin, attr_dn)
156         m['searchFlags'] = MessageElement(flags, FLAG_MOD_REPLACE,
157                                           'searchFlags')
158         self.ldb_admin.modify(m)
159
160         # note we have to update the schema for this change to take effect (on
161         # Windows, at least)
162         self.ldb_admin.set_schema_update_now()
163
164     def get_attr_search_flags(self, attr_dn):
165         """Marks the attribute under test as being confidential"""
166         res = self.ldb_admin.search(attr_dn, scope=SCOPE_BASE,
167                                     attrs=['searchFlags'])
168         return res[0]['searchFlags'][0]
169
170     def make_attr_confidential(self):
171         """Marks the attribute under test as being confidential"""
172
173         # work out the original 'searchFlags' value before we overwrite it
174         old_value = self.get_attr_search_flags(self.attr_dn)
175
176         self.set_attr_search_flags(self.attr_dn, str(SEARCH_FLAG_CONFIDENTIAL))
177
178         # reset the value after the test completes
179         self.addCleanup(self.set_attr_search_flags, self.attr_dn, old_value)
180
181     def get_user_dn(self, name):
182         return "CN={0},{1}".format(name, self.ou)
183
184     def get_user_sid_string(self, username):
185         user_dn = self.get_user_dn(username)
186         user_sid = self.sd_utils.get_object_sid(user_dn)
187         return str(user_sid)
188
189     def get_ldb_connection(self, target_username, target_password):
190         creds_tmp = Credentials()
191         creds_tmp.set_username(target_username)
192         creds_tmp.set_password(target_password)
193         creds_tmp.set_domain(creds.get_domain())
194         creds_tmp.set_realm(creds.get_realm())
195         creds_tmp.set_workstation(creds.get_workstation())
196         features = creds_tmp.get_gensec_features() | gensec.FEATURE_SEAL
197         creds_tmp.set_gensec_features(features)
198         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)
199         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
200         return ldb_target
201
202     def assert_search_result(self, expected_num, expr, samdb):
203
204         # try asking for different attributes back: None/all, the confidential
205         # attribute itself, and a random unrelated attribute
206         attr_filters = [None, ["*"], [self.conf_attr], ['name']]
207         for attr in attr_filters:
208             res = samdb.search(self.test_dn, expression=expr,
209                                scope=SCOPE_SUBTREE, attrs=attr)
210             self.assertEqual(len(res), expected_num,
211                              "%u results, not %u for search %s, attr %s" %
212                              (len(res), expected_num, expr, str(attr)))
213
214     # return a selection of searches that match exactly against the test object
215     def get_exact_match_searches(self):
216         first_char = self.conf_value[:1]
217         last_char = self.conf_value[-1:]
218         test_attr = self.conf_attr
219
220         searches = [
221             # search for the attribute using a sub-string wildcard
222             # (which could reveal the attribute's actual value)
223             "({0}={1}*)".format(test_attr, first_char),
224             "({0}=*{1})".format(test_attr, last_char),
225
226             # sanity-check equality against an exact match on value
227             "({0}={1})".format(test_attr, self.conf_value),
228
229             # '~=' searches don't work against Samba
230             # sanity-check an approx search against an exact match on value
231             # "({0}~={1})".format(test_attr, self.conf_value),
232
233             # check wildcard in an AND search...
234             "(&({0}={1}*)(objectclass=*))".format(test_attr, first_char),
235
236             # ...an OR search (against another term that will never match)
237             "(|({0}={1}*)(objectclass=banana))".format(test_attr, first_char)]
238
239         return searches
240
241     # return searches that match any object with the attribute under test
242     def get_match_all_searches(self):
243         searches = [
244             # check a full wildcard against the confidential attribute
245             # (which could reveal the attribute's presence/absence)
246             "({0}=*)".format(self.conf_attr),
247
248             # check wildcard in an AND search...
249             "(&(objectclass=*)({0}=*))".format(self.conf_attr),
250
251             # ...an OR search (against another term that will never match)
252             "(|(objectclass=banana)({0}=*))".format(self.conf_attr),
253
254             # check <=, and >= expressions that would normally find a match
255             "({0}>=0)".format(self.conf_attr),
256             "({0}<=ZZZZZZZZZZZ)".format(self.conf_attr)]
257
258         return searches
259
260     def assert_conf_attr_searches(self, has_rights_to=0, samdb=None):
261         """Check searches against the attribute under test work as expected"""
262
263         if samdb is None:
264             samdb = self.ldb_user
265
266         if has_rights_to == "all":
267             has_rights_to = self.objects_with_attr
268
269         # these first few searches we just expect to match against the one
270         # object under test that we're trying to guess the value of
271         expected_num = 1 if has_rights_to > 0 else 0
272         for search in self.get_exact_match_searches():
273             self.assert_search_result(expected_num, search, samdb)
274
275         # these next searches will match any objects we have rights to see
276         expected_num = has_rights_to
277         for search in self.get_match_all_searches():
278             self.assert_search_result(expected_num, search, samdb)
279
280     # The following are double negative searches (i.e. NOT non-matching-
281     # condition) which will therefore match ALL objects, including the test
282     # object(s).
283     def get_negative_match_all_searches(self):
284         first_char = self.conf_value[:1]
285         last_char = self.conf_value[-1:]
286         not_first_char = chr(ord(first_char) + 1)
287         not_last_char = chr(ord(last_char) + 1)
288
289         searches = [
290             "(!({0}={1}*))".format(self.conf_attr, not_first_char),
291             "(!({0}=*{1}))".format(self.conf_attr, not_last_char)]
292         return searches
293
294     # the following searches will not match against the test object(s). So
295     # a user with sufficient rights will see an inverse sub-set of objects.
296     # (An unprivileged user would either see all objects on Windows, or no
297     # objects on Samba)
298     def get_inverse_match_searches(self):
299         first_char = self.conf_value[:1]
300         last_char = self.conf_value[-1:]
301         searches = [
302             "(!({0}={1}*))".format(self.conf_attr, first_char),
303             "(!({0}=*{1}))".format(self.conf_attr, last_char)]
304         return searches
305
306     def negative_searches_all_rights(self, total_objects=None):
307         expected_results = {}
308
309         if total_objects is None:
310             total_objects = self.total_objects
311
312         # these searches should match ALL objects (including the OU)
313         for search in self.get_negative_match_all_searches():
314             expected_results[search] = total_objects
315
316         # a ! wildcard should only match the objects without the attribute
317         search = "(!({0}=*))".format(self.conf_attr)
318         expected_results[search] = total_objects - self.objects_with_attr
319
320         # whereas the inverse searches should match all objects *except* the
321         # one under test
322         for search in self.get_inverse_match_searches():
323             expected_results[search] = total_objects - 1
324
325         return expected_results
326
327     # Returns the expected negative (i.e. '!') search behaviour when talking to
328     # a DC, i.e. we assert that users
329     # without rights always see ALL objects in '!' searches
330     def negative_searches_return_all(self, has_rights_to=0,
331                                      total_objects=None):
332         """Asserts user without rights cannot see objects in '!' searches"""
333         expected_results = {}
334
335         if total_objects is None:
336             total_objects = self.total_objects
337
338         # Windows 'hides' objects by always returning all of them, so negative
339         # searches that match all objects will simply return all objects
340         for search in self.get_negative_match_all_searches():
341             expected_results[search] = total_objects
342
343         # if we're matching on everything except the one object under test
344         # (i.e. the inverse subset), we'll still see all objects if
345         # has_rights_to == 0. Or we'll see all bar one if has_rights_to == 1.
346         inverse_searches = self.get_inverse_match_searches()
347         inverse_searches += ["(!({0}=*))".format(self.conf_attr)]
348
349         for search in inverse_searches:
350             expected_results[search] = total_objects - has_rights_to
351
352         return expected_results
353
354     # Returns the expected negative (i.e. '!') search behaviour. This varies
355     # depending on what type of DC we're talking to (i.e. Windows or Samba)
356     # and what access rights the user has.
357     # Note we only handle has_rights_to="all", 1 (the test object), or 0 (i.e.
358     # we don't have rights to any objects)
359     def negative_search_expected_results(self, has_rights_to, total_objects=None):
360
361         if has_rights_to == "all":
362             expect_results = self.negative_searches_all_rights(total_objects)
363
364         else:
365             expect_results = self.negative_searches_return_all(has_rights_to,
366                                                                total_objects)
367         return expect_results
368
369     def assert_negative_searches(self, has_rights_to=0, samdb=None):
370         """Asserts user without rights cannot see objects in '!' searches"""
371
372         if samdb is None:
373             samdb = self.ldb_user
374
375         # build a dictionary of key=search-expr, value=expected_num assertions
376         expected_results = self.negative_search_expected_results(has_rights_to)
377
378         for search, expected_num in expected_results.items():
379             self.assert_search_result(expected_num, search, samdb)
380
381     def assert_attr_returned(self, expect_attr, samdb, attrs):
382         # does a query that should always return a successful result, and
383         # checks whether the confidential attribute is present
384         res = samdb.search(self.conf_dn, expression="(objectClass=*)",
385                            scope=SCOPE_SUBTREE, attrs=attrs)
386         self.assertEqual(1, len(res))
387
388         attr_returned = False
389         for msg in res:
390             if self.conf_attr in msg:
391                 attr_returned = True
392         self.assertEqual(expect_attr, attr_returned)
393
394     def assert_attr_visible(self, expect_attr, samdb=None):
395         if samdb is None:
396             samdb = self.ldb_user
397
398         # sanity-check confidential attribute is/isn't returned as expected
399         # based on the filter attributes we ask for
400         self.assert_attr_returned(expect_attr, samdb, attrs=None)
401         self.assert_attr_returned(expect_attr, samdb, attrs=["*"])
402         self.assert_attr_returned(expect_attr, samdb, attrs=[self.conf_attr])
403
404         # filtering on a different attribute should never return the conf_attr
405         self.assert_attr_returned(expect_attr=False, samdb=samdb,
406                                   attrs=['name'])
407
408     def assert_attr_visible_to_admin(self):
409         # sanity-check the admin user can always see the confidential attribute
410         self.assert_conf_attr_searches(has_rights_to="all",
411                                        samdb=self.ldb_admin)
412         self.assert_negative_searches(has_rights_to="all",
413                                       samdb=self.ldb_admin)
414         self.assert_attr_visible(expect_attr=True, samdb=self.ldb_admin)
415
416
417 class ConfidentialAttrTest(ConfidentialAttrCommon):
418     def test_basic_search(self):
419         """Basic test confidential attributes aren't disclosed via searches"""
420
421         # check we can see a non-confidential attribute in a basic searches
422         self.assert_conf_attr_searches(has_rights_to="all")
423         self.assert_negative_searches(has_rights_to="all")
424         self.assert_attr_visible(expect_attr=True)
425
426         # now make the attribute confidential. Repeat the tests and check that
427         # an ordinary user can't see the attribute, or indirectly match on the
428         # attribute via the search expression
429         self.make_attr_confidential()
430
431         self.assert_conf_attr_searches(has_rights_to=0)
432         self.assert_negative_searches(has_rights_to=0)
433         self.assert_attr_visible(expect_attr=False)
434
435         # sanity-check we haven't hidden the attribute from the admin as well
436         self.assert_attr_visible_to_admin()
437
438     def _test_search_with_allow_acl(self, allow_ace):
439         """Checks a ACE with 'CR' rights can override a confidential attr"""
440         # make the test attribute confidential and check user can't see it
441         self.make_attr_confidential()
442
443         self.assert_conf_attr_searches(has_rights_to=0)
444         self.assert_negative_searches(has_rights_to=0)
445         self.assert_attr_visible(expect_attr=False)
446
447         # apply the allow ACE to the object under test
448         self.sd_utils.dacl_add_ace(self.conf_dn, allow_ace)
449
450         # the user should now be able to see the attribute for the one object
451         # we gave it rights to
452         self.assert_conf_attr_searches(has_rights_to=1)
453         self.assert_negative_searches(has_rights_to=1)
454         self.assert_attr_visible(expect_attr=True)
455
456         # sanity-check the admin can still see the attribute
457         self.assert_attr_visible_to_admin()
458
459     def test_search_with_attr_acl_override(self):
460         """Make the confidential attr visible via an OA attr ACE"""
461
462         # set the SEC_ADS_CONTROL_ACCESS bit ('CR') for the user for the
463         # attribute under test, so the user can see it once more
464         user_sid = self.get_user_sid_string(self.user)
465         ace = "(OA;;CR;{0};;{1})".format(self.conf_attr_guid, user_sid)
466
467         self._test_search_with_allow_acl(ace)
468
469     def test_search_with_propset_acl_override(self):
470         """Make the confidential attr visible via a Property-set ACE"""
471
472         # set the SEC_ADS_CONTROL_ACCESS bit ('CR') for the user for the
473         # property-set containing the attribute under test (i.e. the
474         # attributeSecurityGuid), so the user can see it once more
475         user_sid = self.get_user_sid_string(self.user)
476         ace = "(OA;;CR;{0};;{1})".format(self.conf_attr_sec_guid, user_sid)
477
478         self._test_search_with_allow_acl(ace)
479
480     def test_search_with_acl_override(self):
481         """Make the confidential attr visible via a general 'allow' ACE"""
482
483         # set the allow SEC_ADS_CONTROL_ACCESS bit ('CR') for the user
484         user_sid = self.get_user_sid_string(self.user)
485         ace = "(A;;CR;;;{0})".format(user_sid)
486
487         self._test_search_with_allow_acl(ace)
488
489     def test_search_with_blanket_oa_acl(self):
490         """Make the confidential attr visible via a non-specific OA ACE"""
491
492         # this just checks that an Object Access (OA) ACE without a GUID
493         # specified will work the same as an 'Access' (A) ACE
494         user_sid = self.get_user_sid_string(self.user)
495         ace = "(OA;;CR;;;{0})".format(user_sid)
496
497         self._test_search_with_allow_acl(ace)
498
499     def _test_search_with_neutral_acl(self, neutral_ace):
500         """Checks that a user does NOT gain access via an unrelated ACE"""
501
502         # make the test attribute confidential and check user can't see it
503         self.make_attr_confidential()
504
505         self.assert_conf_attr_searches(has_rights_to=0)
506         self.assert_negative_searches(has_rights_to=0)
507         self.assert_attr_visible(expect_attr=False)
508
509         # apply the ACE to the object under test
510         self.sd_utils.dacl_add_ace(self.conf_dn, neutral_ace)
511
512         # this should make no difference to the user's ability to see the attr
513         self.assert_conf_attr_searches(has_rights_to=0)
514         self.assert_negative_searches(has_rights_to=0)
515         self.assert_attr_visible(expect_attr=False)
516
517         # sanity-check the admin can still see the attribute
518         self.assert_attr_visible_to_admin()
519
520     def test_search_with_neutral_acl(self):
521         """Give the user all rights *except* CR for any attributes"""
522
523         # give the user all rights *except* CR and check it makes no difference
524         user_sid = self.get_user_sid_string(self.user)
525         ace = "(A;;RPWPCCDCLCLORCWOWDSDDTSW;;;{0})".format(user_sid)
526         self._test_search_with_neutral_acl(ace)
527
528     def test_search_with_neutral_attr_acl(self):
529         """Give the user all rights *except* CR for the attribute under test"""
530
531         # giving user all OA rights *except* CR should make no difference
532         user_sid = self.get_user_sid_string(self.user)
533         rights = "RPWPCCDCLCLORCWOWDSDDTSW"
534         ace = "(OA;;{0};{1};;{2})".format(rights, self.conf_attr_guid, user_sid)
535         self._test_search_with_neutral_acl(ace)
536
537     def test_search_with_neutral_cr_acl(self):
538         """Give the user CR rights for *another* unrelated attribute"""
539
540         # giving user object-access CR rights to an unrelated attribute
541         user_sid = self.get_user_sid_string(self.user)
542         # use the GUID for sAMAccountName here (for no particular reason)
543         unrelated_attr = "3e0abfd0-126a-11d0-a060-00aa006c33ed"
544         ace = "(OA;;CR;{0};;{1})".format(unrelated_attr, user_sid)
545         self._test_search_with_neutral_acl(ace)
546
547
548 # Check that a Deny ACL on an attribute doesn't reveal confidential info
549 class ConfidentialAttrTestDenyAcl(ConfidentialAttrCommon):
550
551     def assert_not_in_result(self, res, exclude_dn):
552         for msg in res:
553             self.assertNotEqual(msg.dn, exclude_dn,
554                                 "Search revealed object {0}".format(exclude_dn))
555
556     # deny ACL tests are slightly different as we are only denying access to
557     # the one object under test (rather than any objects with that attribute).
558     # Therefore we need an extra check that we don't reveal the test object
559     # in the search, if we're not supposed to
560     def assert_search_result(self, expected_num, expr, samdb,
561                              excl_testobj=False):
562
563         # try asking for different attributes back: None/all, the confidential
564         # attribute itself, and a random unrelated attribute
565         attr_filters = [None, ["*"], [self.conf_attr], ['name']]
566         for attr in attr_filters:
567             res = samdb.search(self.test_dn, expression=expr,
568                                scope=SCOPE_SUBTREE, attrs=attr)
569             self.assertEqual(len(res), expected_num,
570                              "%u results, not %u for search %s, attr %s" %
571                              (len(res), expected_num, expr, str(attr)))
572
573             # assert we haven't revealed the hidden test-object
574             if excl_testobj:
575                 self.assert_not_in_result(res, exclude_dn=self.conf_dn)
576
577     # we make a few tweaks to the regular version of this function to cater to
578     # denying specifically one object via an ACE
579     def assert_conf_attr_searches(self, has_rights_to=0, samdb=None):
580         """Check searches against the attribute under test work as expected"""
581
582         if samdb is None:
583             samdb = self.ldb_user
584
585         # make sure the test object is not returned if we've been denied rights
586         # to it via an ACE
587         excl_testobj = has_rights_to == "deny-one"
588
589         # these first few searches we just expect to match against the one
590         # object under test that we're trying to guess the value of
591         expected_num = 1 if has_rights_to == "all" else 0
592
593         for search in self.get_exact_match_searches():
594             self.assert_search_result(expected_num, search, samdb,
595                                       excl_testobj)
596
597         # these next searches will match any objects with the attribute that
598         # we have rights to see (i.e. all except the object under test)
599         if has_rights_to == "all":
600             expected_num = self.objects_with_attr
601         elif has_rights_to == "deny-one":
602             expected_num = self.objects_with_attr - 1
603
604         for search in self.get_match_all_searches():
605             self.assert_search_result(expected_num, search, samdb,
606                                       excl_testobj)
607
608     # override method specifically for deny ACL test cases
609     def negative_searches_return_all(self, has_rights_to=0,
610                                      total_objects=None):
611         expected_results = {}
612
613         # When a user lacks access rights to an object, Windows 'hides' it in
614         # '!' searches by always returning it, regardless of whether it matches
615         searches = self.get_negative_match_all_searches()
616         searches += self.get_inverse_match_searches()
617         for search in searches:
618             expected_results[search] = self.total_objects
619
620         # in the wildcard case, the one object we don't have rights to gets
621         # bundled in with the objects that don't have the attribute at all
622         search = "(!({0}=*))".format(self.conf_attr)
623         has_rights_to = self.objects_with_attr - 1
624         expected_results[search] = self.total_objects - has_rights_to
625         return expected_results
626
627     # override method specifically for deny ACL test cases
628     def assert_negative_searches(self, has_rights_to=0, samdb=None):
629         """Asserts user without rights cannot see objects in '!' searches"""
630
631         if samdb is None:
632             samdb = self.ldb_user
633
634         # As the deny ACL is only denying access to one particular object, add
635         # an extra check that the denied object is not returned. (We can only
636         # assert this if the '!'/negative search behaviour is to suppress any
637         # objects we don't have access rights to)
638         excl_testobj = False
639
640         # build a dictionary of key=search-expr, value=expected_num assertions
641         expected_results = self.negative_search_expected_results(has_rights_to)
642
643         for search, expected_num in expected_results.items():
644             self.assert_search_result(expected_num, search, samdb,
645                                       excl_testobj=excl_testobj)
646
647     def _test_search_with_deny_acl(self, ace):
648         # check the user can see the attribute initially
649         self.assert_conf_attr_searches(has_rights_to="all")
650         self.assert_negative_searches(has_rights_to="all")
651         self.assert_attr_visible(expect_attr=True)
652
653         # add the ACE that denies access to the attr under test
654         self.sd_utils.dacl_add_ace(self.conf_dn, ace)
655
656         # the user shouldn't be able to see the attribute anymore
657         self.assert_conf_attr_searches(has_rights_to="deny-one")
658         self.assert_negative_searches(has_rights_to="deny-one")
659         self.assert_attr_visible(expect_attr=False)
660
661         # sanity-check we haven't hidden the attribute from the admin as well
662         self.assert_attr_visible_to_admin()
663
664     def test_search_with_deny_attr_acl(self):
665         """Checks a deny ACE works the same way as a confidential attribute"""
666
667         # add an ACE that denies the user Read Property (RP) access to the attr
668         # (which is similar to making the attribute confidential)
669         user_sid = self.get_user_sid_string(self.user)
670         ace = "(OD;;RP;{0};;{1})".format(self.conf_attr_guid, user_sid)
671
672         # check the user cannot see the attribute anymore
673         self._test_search_with_deny_acl(ace)
674
675     def test_search_with_deny_acl(self):
676         """Checks a blanket deny ACE denies access to an object's attributes"""
677
678         # add an blanket deny ACE for Read Property (RP) rights
679         user_dn = self.get_user_dn(self.user)
680         user_sid = self.sd_utils.get_object_sid(user_dn)
681         ace = "(D;;RP;;;{0})".format(str(user_sid))
682
683         # check the user cannot see the attribute anymore
684         self._test_search_with_deny_acl(ace)
685
686     def test_search_with_deny_propset_acl(self):
687         """Checks a deny ACE on the attribute's Property-Set"""
688
689         # add an blanket deny ACE for Read Property (RP) rights
690         user_sid = self.get_user_sid_string(self.user)
691         ace = "(OD;;RP;{0};;{1})".format(self.conf_attr_sec_guid, user_sid)
692
693         # check the user cannot see the attribute anymore
694         self._test_search_with_deny_acl(ace)
695
696     def test_search_with_blanket_oa_deny_acl(self):
697         """Checks a non-specific 'OD' ACE works the same as a 'D' ACE"""
698
699         # this just checks that adding a 'Object Deny' (OD) ACE without
700         # specifying a GUID will work the same way as a 'Deny' (D) ACE
701         user_sid = self.get_user_sid_string(self.user)
702         ace = "(OD;;RP;;;{0})".format(user_sid)
703
704         # check the user cannot see the attribute anymore
705         self._test_search_with_deny_acl(ace)
706
707
708 # Check that using the dirsync controls doesn't reveal confidential attributes
709 class ConfidentialAttrTestDirsync(ConfidentialAttrCommon):
710
711     def setUp(self):
712         super(ConfidentialAttrTestDirsync, self).setUp()
713         self.dirsync = ["dirsync:1:1:1000"]
714
715         # because we need to search on the base DN when using the dirsync
716         # controls, we need an extra filter for the inverse ('!') search,
717         # so we don't get thousands of objects returned
718         self.extra_filter = \
719             "(&(samaccountname={0}*)(!(isDeleted=*)))".format(self.user_prefix)
720         self.single_obj_filter = \
721             "(&(samaccountname={0})(!(isDeleted=*)))".format(self.conf_user)
722
723         self.attr_filters = [None, ["*"], ["name"]]
724
725         # Note dirsync behaviour is slighty different for the attribute under
726         # test - when you have full access rights, it only returns the objects
727         # that actually have this attribute (i.e. it doesn't return an empty
728         # message with just the DN). So we add the 'name' attribute into the
729         # attribute filter to avoid complicating our assertions further
730         self.attr_filters += [[self.conf_attr, "name"]]
731
732     # override method specifically for dirsync, i.e. add dirsync controls
733     def assert_search_result(self, expected_num, expr, samdb, base_dn=None):
734
735         # Note dirsync must always search on the partition base DN
736         base_dn = self.base_dn
737
738         # we need an extra filter for dirsync because:
739         # - we search on the base DN, so otherwise the '!' searches return
740         #   thousands of unrelated results, and
741         # - we make the test attribute preserve-on-delete in one case, so we
742         #   want to weed out results from any previous test runs
743         search = "(&{0}{1})".format(expr, self.extra_filter)
744
745         for attr in self.attr_filters:
746             res = samdb.search(base_dn, expression=search, scope=SCOPE_SUBTREE,
747                                attrs=attr, controls=self.dirsync)
748             self.assertEqual(len(res), expected_num,
749                             "%u results, not %u for search %s, attr %s" %
750                             (len(res), expected_num, search, str(attr)))
751
752     # override method specifically for dirsync, i.e. add dirsync controls
753     def assert_attr_returned(self, expect_attr, samdb, attrs,
754                              no_result_ok=False):
755
756         # When using dirsync, the base DN we search on needs to be a naming
757         # context. Add an extra filter to ignore all the objects we aren't
758         # interested in
759         expr = self.single_obj_filter
760         res = samdb.search(self.base_dn, expression=expr, scope=SCOPE_SUBTREE,
761                            attrs=attrs, controls=self.dirsync)
762         if not no_result_ok:
763             self.assertEqual(1, len(res))
764
765         attr_returned = False
766         for msg in res:
767             if self.conf_attr in msg and len(msg[self.conf_attr]) > 0:
768                 attr_returned = True
769         self.assertEqual(expect_attr, attr_returned)
770
771     # override method specifically for dirsync (it has slightly different
772     # behaviour to normal when requesting specific attributes)
773     def assert_attr_visible(self, expect_attr, samdb=None):
774         if samdb is None:
775             samdb = self.ldb_user
776
777         # sanity-check confidential attribute is/isn't returned as expected
778         # based on the filter attributes we ask for
779         self.assert_attr_returned(expect_attr, samdb, attrs=None)
780         self.assert_attr_returned(expect_attr, samdb, attrs=["*"])
781
782         if expect_attr:
783             self.assert_attr_returned(expect_attr, samdb,
784                                       attrs=[self.conf_attr])
785         else:
786             # The behaviour with dirsync when asking solely for an attribute
787             # that you don't have rights to is a bit strange. Samba returns
788             # no result rather than an empty message with just the DN.
789             # Presumably this is due to dirsync module behaviour. It's not
790             # disclosive in that the DC behaves the same way as if you asked
791             # for a garbage/non-existent attribute
792             self.assert_attr_returned(expect_attr, samdb,
793                                       attrs=[self.conf_attr],
794                                       no_result_ok=True)
795             self.assert_attr_returned(expect_attr, samdb,
796                                       attrs=["garbage"], no_result_ok=True)
797
798         # filtering on a different attribute should never return the conf_attr
799         self.assert_attr_returned(expect_attr=False, samdb=samdb,
800                                   attrs=['name'])
801
802     # override method specifically for dirsync (total object count differs)
803     def assert_negative_searches(self, has_rights_to=0, samdb=None):
804         """Asserts user without rights cannot see objects in '!' searches"""
805
806         if samdb is None:
807             samdb = self.ldb_user
808
809         # because dirsync uses an extra filter, the total objects we expect
810         # here only includes the user objects (not the parent OU)
811         total_objects = len(self.all_users)
812         expected_results = self.negative_search_expected_results(has_rights_to,
813                                                                  total_objects)
814
815         for search, expected_num in expected_results.items():
816             self.assert_search_result(expected_num, search, samdb)
817
818     def test_search_with_dirsync(self):
819         """Checks dirsync controls don't reveal confidential attributes"""
820
821         self.assert_conf_attr_searches(has_rights_to="all")
822         self.assert_attr_visible(expect_attr=True)
823         self.assert_negative_searches(has_rights_to="all")
824
825         # make the test attribute confidential and check user can't see it,
826         # even if they use the dirsync controls
827         self.make_attr_confidential()
828
829         self.assert_conf_attr_searches(has_rights_to=0)
830         self.assert_attr_visible(expect_attr=False)
831         self.assert_negative_searches(has_rights_to=0)
832
833         # as a final sanity-check, make sure the admin can still see the attr
834         self.assert_conf_attr_searches(has_rights_to="all",
835                                        samdb=self.ldb_admin)
836         self.assert_attr_visible(expect_attr=True, samdb=self.ldb_admin)
837         self.assert_negative_searches(has_rights_to="all",
838                                       samdb=self.ldb_admin)
839
840     def get_guid_string(self, dn):
841         """Returns an object's GUID (in string format)"""
842         res = self.ldb_admin.search(base=dn, attrs=["objectGUID"],
843                                     scope=SCOPE_BASE)
844         guid = res[0]['objectGUID'][0]
845         return self.ldb_admin.schema_format_value("objectGUID", guid).decode('utf-8')
846
847     def make_attr_preserve_on_delete(self):
848         """Marks the attribute under test as being preserve on delete"""
849
850         # work out the original 'searchFlags' value before we overwrite it
851         search_flags = int(self.get_attr_search_flags(self.attr_dn))
852
853         # check we've already set the confidential flag
854         self.assertNotEqual(0, search_flags & SEARCH_FLAG_CONFIDENTIAL)
855         search_flags |= SEARCH_FLAG_PRESERVEONDELETE
856
857         self.set_attr_search_flags(self.attr_dn, str(search_flags))
858
859     def change_attr_under_test(self, attr_name, attr_cn):
860         # change the attribute that the test code uses
861         self.conf_attr = attr_name
862         self.attr_dn = "{0},{1}".format(attr_cn, self.schema_dn)
863
864         # set the new attribute for the user-under-test
865         self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
866
867         # 2 other users also have the attribute-under-test set (to a randomish
868         # value). Set the new attribute for them now (normally this gets done
869         # in the setUp())
870         for username in self.all_users:
871             if "other-user" in username:
872                 dn = self.get_user_dn(username)
873                 self.add_attr(dn, self.conf_attr, "xyz-blah")
874
875     def test_search_with_dirsync_deleted_objects(self):
876         """Checks dirsync doesn't reveal confidential info for deleted objs"""
877
878         # change the attribute we're testing (we'll preserve on delete for this
879         # test case, which means the attribute-under-test hangs around after
880         # the test case finishes, and would interfere with the searches for
881         # subsequent other test cases)
882         self.change_attr_under_test("carLicense", "CN=carLicense")
883
884         # Windows dirsync behaviour is a little strange when you request
885         # attributes that deleted objects no longer have, so just request 'all
886         # attributes' to simplify the test logic
887         self.attr_filters = [None, ["*"]]
888
889         # normally dirsync uses extra filters to exclude deleted objects that
890         # we're not interested in. Override these filters so they WILL include
891         # deleted objects, but only from this particular test run. We can do
892         # this by matching lastKnownParent against this test case's OU, which
893         # will match any deleted child objects.
894         ou_guid = self.get_guid_string(self.ou)
895         deleted_filter = "(lastKnownParent=<GUID={0}>)".format(ou_guid)
896
897         # the extra-filter will get combined via AND with the search expression
898         # we're testing, i.e. filter on the confidential attribute AND only
899         # include non-deleted objects, OR deleted objects from this test run
900         exclude_deleted_objs_filter = self.extra_filter
901         self.extra_filter = "(|{0}{1})".format(exclude_deleted_objs_filter,
902                                                deleted_filter)
903
904         # for matching on a single object, the search expresseion becomes:
905         # match exactly by account-name AND either a non-deleted object OR a
906         # deleted object from this test run
907         match_by_name = "(samaccountname={0})".format(self.conf_user)
908         not_deleted = "(!(isDeleted=*))"
909         self.single_obj_filter = "(&{0}(|{1}{2}))".format(match_by_name,
910                                                           not_deleted,
911                                                           deleted_filter)
912
913         # check that the search filters work as expected
914         self.assert_conf_attr_searches(has_rights_to="all")
915         self.assert_attr_visible(expect_attr=True)
916         self.assert_negative_searches(has_rights_to="all")
917
918         # make the test attribute confidential *and* preserve on delete.
919         self.make_attr_confidential()
920         self.make_attr_preserve_on_delete()
921
922         # check we can't see the objects now, even with using dirsync controls
923         self.assert_conf_attr_searches(has_rights_to=0)
924         self.assert_attr_visible(expect_attr=False)
925         self.assert_negative_searches(has_rights_to=0)
926
927         # now delete the users (except for the user whose LDB connection
928         # we're currently using)
929         for user in self.all_users:
930             if user is not self.user:
931                 self.ldb_admin.delete(self.get_user_dn(user))
932
933         # check we still can't see the objects
934         self.assert_conf_attr_searches(has_rights_to=0)
935         self.assert_negative_searches(has_rights_to=0)
936
937     def test_timing_attack(self):
938         # Create the machine account.
939         mach_name = f'conf_timing_{random.randint(0, 0xffff)}'
940         mach_dn = Dn(self.ldb_admin, f'CN={mach_name},{self.ou}')
941         details = {
942             'dn': mach_dn,
943             'objectclass': 'computer',
944             'sAMAccountName': f'{mach_name}$',
945         }
946         self.ldb_admin.add(details)
947
948         # Get the machine account's GUID.
949         res = self.ldb_admin.search(mach_dn,
950                                     attrs=['objectGUID'],
951                                     scope=SCOPE_BASE)
952         mach_guid = res[0].get('objectGUID', idx=0)
953
954         # Now we can create an msFVE-RecoveryInformation object that is a child
955         # of the machine account object.
956         recovery_dn = Dn(self.ldb_admin, str(mach_dn))
957         recovery_dn.add_child('CN=recovery_info')
958
959         secret_pw = 'Secret007'
960         not_secret_pw = 'Secret008'
961
962         secret_pw_utf8 = secret_pw.encode('utf-8')
963
964         # The crucial attribute, msFVE-RecoveryPassword, is a confidential
965         # attribute.
966         conf_attr = 'msFVE-RecoveryPassword'
967
968         m = Message(recovery_dn)
969         m['objectClass'] = 'msFVE-RecoveryInformation'
970         m['msFVE-RecoveryGuid'] = mach_guid
971         m[conf_attr] = secret_pw
972         self.ldb_admin.add(m)
973
974         attrs = [conf_attr]
975
976         # Search for the confidential attribute as administrator, ensuring it
977         # is visible.
978         res = self.ldb_admin.search(recovery_dn,
979                                     attrs=attrs,
980                                     scope=SCOPE_BASE)
981         self.assertEqual(1, len(res))
982         pw = res[0].get(conf_attr, idx=0)
983         self.assertEqual(secret_pw_utf8, pw)
984
985         # Repeat the search with an expression matching on the confidential
986         # attribute. This should also work.
987         res = self.ldb_admin.search(
988             recovery_dn,
989             attrs=attrs,
990             expression=f'({conf_attr}={secret_pw})',
991             scope=SCOPE_BASE)
992         self.assertEqual(1, len(res))
993         pw = res[0].get(conf_attr, idx=0)
994         self.assertEqual(secret_pw_utf8, pw)
995
996         # Search for the attribute as an unprivileged user. It should not be
997         # visible.
998         user_res = self.ldb_user.search(recovery_dn,
999                                         attrs=attrs,
1000                                         scope=SCOPE_BASE)
1001         pw = user_res[0].get(conf_attr, idx=0)
1002         # The attribute should be None.
1003         self.assertIsNone(pw)
1004
1005         # We use LDAP_MATCHING_RULE_TRANSITIVE_EVAL to create a search
1006         # expression that takes a long time to execute, by setting off another
1007         # search each time it is evaluated. It makes no difference that the
1008         # object on which we're searching has no 'member' attribute.
1009         dummy_dn = 'cn=user,cn=users,dc=samba,dc=example,dc=com'
1010         slow_subexpr = f'(member:1.2.840.113556.1.4.1941:={dummy_dn})'
1011         slow_expr = f'(|{slow_subexpr * 100})'
1012
1013         # The full search expression. It comprises a match on the confidential
1014         # attribute joined by an AND to our slow search expression, The AND
1015         # operator is short-circuiting, so if our first subexpression fails to
1016         # match, we'll bail out of the search early. Otherwise, we'll evaluate
1017         # the slow part; as its subexpressions are joined by ORs, and will all
1018         # fail to match, every one of them will need to be evaluated. By
1019         # measuring how long the search takes, we'll be able to infer whether
1020         # the confidential attribute matched or not.
1021
1022         # This is bad if we are not an administrator, and are able to use this
1023         # to determine the values of confidential attributes. Therefore we need
1024         # to ensure we can't observe any difference in timing.
1025         correct_expr = f'(&({conf_attr}={secret_pw}){slow_expr})'
1026         wrong_expr = f'(&({conf_attr}={not_secret_pw}){slow_expr})'
1027
1028         def standard_uncertainty_bounds(times):
1029             mean = statistics.mean(times)
1030             stdev = statistics.stdev(times, mean)
1031
1032             return (mean - stdev, mean + stdev)
1033
1034         # Perform a number of searches with both correct and incorrect
1035         # expressions, and return the uncertainty bounds for each.
1036         def time_searches(samdb):
1037             warmup_samples = 3
1038             samples = 10
1039             matching_times = []
1040             non_matching_times = []
1041
1042             for _ in range(warmup_samples):
1043                 samdb.search(recovery_dn,
1044                              attrs=attrs,
1045                              expression=correct_expr,
1046                              scope=SCOPE_BASE)
1047
1048             for _ in range(samples):
1049                 # Measure the time taken for a search, for both a matching and
1050                 # a non-matching search expression.
1051
1052                 prev = time.time()
1053                 samdb.search(recovery_dn,
1054                              attrs=attrs,
1055                              expression=correct_expr,
1056                              scope=SCOPE_BASE)
1057                 now = time.time()
1058                 matching_times.append(now - prev)
1059
1060                 prev = time.time()
1061                 samdb.search(recovery_dn,
1062                              attrs=attrs,
1063                              expression=wrong_expr,
1064                              scope=SCOPE_BASE)
1065                 now = time.time()
1066                 non_matching_times.append(now - prev)
1067
1068             matching = standard_uncertainty_bounds(matching_times)
1069             non_matching = standard_uncertainty_bounds(non_matching_times)
1070             return matching, non_matching
1071
1072         def assertRangesDistinct(a, b):
1073             a0, a1 = a
1074             b0, b1 = b
1075             self.assertLess(min(a1, b1), max(a0, b0))
1076
1077         def assertRangesOverlap(a, b):
1078             a0, a1 = a
1079             b0, b1 = b
1080             self.assertGreaterEqual(min(a1, b1), max(a0, b0))
1081
1082         # For an administrator, the uncertainty bounds for matching and
1083         # non-matching searches should be distinct. This shows that the two
1084         # cases are distinguishable, and therefore that confidential attributes
1085         # are visible.
1086         admin_matching, admin_non_matching = time_searches(self.ldb_admin)
1087         assertRangesDistinct(admin_matching, admin_non_matching)
1088
1089         # The user cannot view the confidential attribute, so the uncertainty
1090         # bounds for matching and non-matching searches must overlap. The two
1091         # cases must be indistinguishable.
1092         user_matching, user_non_matching = time_searches(self.ldb_user)
1093         assertRangesOverlap(user_matching, user_non_matching)
1094
1095
1096 TestProgram(module=__name__, opts=subunitopts)