3f82a359553012b2743d7c4683407fcc009b4ef2
[samba.git] / source4 / dsdb / tests / python / confidential_attr.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests that confidential attributes (or attributes protected by a ACL that
5 # denies read access) cannot be guessed through wildcard DB searches.
6 #
7 # Copyright (C) Catalyst.Net Ltd
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22 import optparse
23 import sys
24 sys.path.insert(0, "bin/python")
25
26 import samba
27 import os
28 from samba.tests.subunitrun import SubunitOptions, TestProgram
29 import samba.getopt as options
30 from ldb import SCOPE_BASE, SCOPE_SUBTREE
31 from samba.dsdb import SEARCH_FLAG_CONFIDENTIAL, SEARCH_FLAG_PRESERVEONDELETE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD
34 from samba.auth import system_session
35 from samba import gensec, sd_utils
36 from samba.samdb import SamDB
37 from samba.credentials import Credentials, DONT_USE_KERBEROS
38 import samba.tests
39 from samba.tests import delete_force
40 import samba.dsdb
41
42 parser = optparse.OptionParser("confidential_attr.py [options] <host>")
43 sambaopts = options.SambaOptions(parser)
44 parser.add_option_group(sambaopts)
45 parser.add_option_group(options.VersionOptions(parser))
46
47 # use command line creds if available
48 credopts = options.CredentialsOptions(parser)
49 parser.add_option_group(credopts)
50 subunitopts = SubunitOptions(parser)
51 parser.add_option_group(subunitopts)
52
53 opts, args = parser.parse_args()
54
55 if len(args) < 1:
56     parser.print_usage()
57     sys.exit(1)
58
59 host = args[0]
60 if "://" not in host:
61     ldaphost = "ldap://%s" % host
62 else:
63     ldaphost = host
64     start = host.rindex("://")
65     host = host.lstrip(start + 3)
66
67 lp = sambaopts.get_loadparm()
68 creds = credopts.get_credentials(lp)
69 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
70
71 # When a user does not have access rights to view the objects' attributes,
72 # Windows and Samba behave slightly differently.
73 # A windows DC will always act as if the hidden attribute doesn't exist AT ALL
74 # (for an unprivileged user). So, even for a user that lacks access rights,
75 # the inverse/'!' queries should return ALL objects. This is similar to the
76 # kludgeaclredacted behaviour on Samba.
77 # However, on Samba (for implementation simplicity) we never return a matching
78 # result for an unprivileged user.
79 # Either approach is OK, so long as it gets applied consistently and we don't
80 # disclose any sensitive details by varying what gets returned by the search.
81 DC_MODE_RETURN_NONE = 0
82 DC_MODE_RETURN_ALL = 1
83
84 #
85 # Tests start here
86 #
87 class ConfidentialAttrCommon(samba.tests.TestCase):
88
89     def setUp(self):
90         super(ConfidentialAttrCommon, self).setUp()
91
92         self.ldb_admin = SamDB(ldaphost, credentials=creds,
93                                session_info=system_session(lp), lp=lp)
94         self.user_pass = "samba123@"
95         self.base_dn = self.ldb_admin.domain_dn()
96         self.schema_dn = self.ldb_admin.get_schema_basedn()
97         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
98
99         # the tests work by setting the 'Confidential' bit in the searchFlags
100         # for an existing schema attribute. This only works against Windows if
101         # the systemFlags does not have FLAG_SCHEMA_BASE_OBJECT set for the
102         # schema attribute being modified. There are only a few attributes that
103         # meet this criteria (most of which only apply to 'user' objects)
104         self.conf_attr = "homePostalAddress"
105         attr_cn = "CN=Address-Home"
106         # schemaIdGuid for homePostalAddress (used for ACE tests)
107         self.conf_attr_guid = "16775781-47f3-11d1-a9c3-0000f80367c1"
108         self.conf_attr_sec_guid = "77b5b886-944a-11d1-aebd-0000f80367c1"
109         self.attr_dn = "{},{}".format(attr_cn, self.schema_dn)
110
111         userou = "OU=conf-attr-test"
112         self.ou = "{},{}".format(userou, self.base_dn)
113         self.ldb_admin.create_ou(self.ou)
114
115         # use a common username prefix, so we can use sAMAccountName=CATC-* as
116         # a search filter to only return the users we're interested in
117         self.user_prefix = "catc-"
118
119         # add a test object with this attribute set
120         self.conf_value = "abcdef"
121         self.conf_user = "{}conf-user".format(self.user_prefix)
122         self.ldb_admin.newuser(self.conf_user, self.user_pass, userou=userou)
123         self.conf_dn = self.get_user_dn(self.conf_user)
124         self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
125
126         # add a sneaky user that will try to steal our secrets
127         self.user = "{}sneaky-user".format(self.user_prefix)
128         self.ldb_admin.newuser(self.user, self.user_pass, userou=userou)
129         self.ldb_user = self.get_ldb_connection(self.user, self.user_pass)
130
131         self.all_users = [self.user, self.conf_user]
132
133         # add some other users that also have confidential attributes, so we can
134         # check we don't disclose their details, particularly in '!' searches
135         for i in range(1, 3):
136             username = "{}other-user{}".format(self.user_prefix, i)
137             self.ldb_admin.newuser(username, self.user_pass, userou=userou)
138             userdn = self.get_user_dn(username)
139             self.add_attr(userdn, self.conf_attr, "xyz{}".format(i))
140             self.all_users.append(username)
141
142         # there are 4 users in the OU, plus the OU itself
143         self.test_dn = self.ou
144         self.total_objects = len(self.all_users) + 1
145         self.objects_with_attr = 3
146
147         # sanity-check the flag is not already set (this'll cause problems if
148         # previous test run didn't clean up properly)
149         search_flags = self.get_attr_search_flags(self.attr_dn)
150         self.assertTrue(int(search_flags) & SEARCH_FLAG_CONFIDENTIAL == 0,
151                         "{} searchFlags already {}".format(self.conf_attr,
152                                                            search_flags))
153
154     def tearDown(self):
155         super(ConfidentialAttrCommon, self).tearDown()
156         self.ldb_admin.delete(self.ou, ["tree_delete:1"])
157
158     def add_attr(self, dn, attr, value):
159         m = Message()
160         m.dn = Dn(self.ldb_admin, dn)
161         m[attr] = MessageElement(value, FLAG_MOD_ADD, attr)
162         self.ldb_admin.modify(m)
163
164     def set_attr_search_flags(self, attr_dn, flags):
165         """Modifies the searchFlags for an object in the schema"""
166         m = Message()
167         m.dn = Dn(self.ldb_admin, attr_dn)
168         m['searchFlags'] = MessageElement(flags, FLAG_MOD_REPLACE,
169                                           'searchFlags')
170         self.ldb_admin.modify(m)
171
172         # note we have to update the schema for this change to take effect (on
173         # Windows, at least)
174         self.ldb_admin.set_schema_update_now()
175
176     def get_attr_search_flags(self, attr_dn):
177         """Marks the attribute under test as being confidential"""
178         res = self.ldb_admin.search(attr_dn, scope=SCOPE_BASE,
179                                     attrs=['searchFlags'])
180         return res[0]['searchFlags'][0]
181
182     def make_attr_confidential(self):
183         """Marks the attribute under test as being confidential"""
184
185         # work out the original 'searchFlags' value before we overwrite it
186         old_value = self.get_attr_search_flags(self.attr_dn)
187
188         self.set_attr_search_flags(self.attr_dn, str(SEARCH_FLAG_CONFIDENTIAL))
189
190         # reset the value after the test completes
191         self.addCleanup(self.set_attr_search_flags, self.attr_dn, old_value)
192
193     # The behaviour of the DC can differ in some cases, depending on whether
194     # we're talking to a Windows DC or a Samba DC
195     def guess_dc_mode(self):
196         # if we're in selftest, we can be pretty sure it's a Samba DC
197         if os.environ.get('SAMBA_SELFTEST') == '1':
198             return DC_MODE_RETURN_NONE
199
200         searches = self.get_negative_match_all_searches()
201         res = self.ldb_user.search(self.test_dn, expression=searches[0],
202                                    scope=SCOPE_SUBTREE)
203
204         # we default to DC_MODE_RETURN_NONE (samba).Update this if it
205         # looks like we're talking to a Windows DC
206         if len(res) == self.total_objects:
207             return DC_MODE_RETURN_ALL
208
209         # otherwise assume samba DC behaviour
210         return DC_MODE_RETURN_NONE
211
212     def get_user_dn(self, name):
213         return "CN={},{}".format(name, self.ou)
214
215     def get_user_sid_string(self, username):
216         user_dn = self.get_user_dn(username)
217         user_sid = self.sd_utils.get_object_sid(user_dn)
218         return str(user_sid)
219
220     def get_ldb_connection(self, target_username, target_password):
221         creds_tmp = Credentials()
222         creds_tmp.set_username(target_username)
223         creds_tmp.set_password(target_password)
224         creds_tmp.set_domain(creds.get_domain())
225         creds_tmp.set_realm(creds.get_realm())
226         creds_tmp.set_workstation(creds.get_workstation())
227         features = creds_tmp.get_gensec_features() | gensec.FEATURE_SEAL
228         creds_tmp.set_gensec_features(features)
229         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)
230         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
231         return ldb_target
232
233     def assert_not_in_result(self, res, exclude_dn):
234         for msg in res:
235             self.assertNotEqual(msg.dn, exclude_dn,
236                                 "Search revealed object {}".format(exclude_dn))
237
238     def assert_search_result(self, expected_num, expr, samdb):
239
240         # try asking for different attributes back: None/all, the confidential
241         # attribute itself, and a random unrelated attribute
242         attr_filters = [None, ["*"], [self.conf_attr], ['name']]
243         for attr in attr_filters:
244             res = samdb.search(self.test_dn, expression=expr,
245                                scope=SCOPE_SUBTREE, attrs=attr)
246             self.assertTrue(len(res) == expected_num,
247                             "%u results, not %u for search %s, attr %s" %
248                             (len(res), expected_num, expr, str(attr)))
249
250     # return a selection of searches that match exactly against the test object
251     def get_exact_match_searches(self):
252         first_char = self.conf_value[:1]
253         last_char = self.conf_value[-1:]
254         test_attr = self.conf_attr
255
256         searches = [
257             # search for the attribute using a sub-string wildcard
258             # (which could reveal the attribute's actual value)
259             "({}={}*)".format(test_attr, first_char),
260             "({}=*{})".format(test_attr, last_char),
261
262             # sanity-check equality against an exact match on value
263             "({}={})".format(test_attr, self.conf_value),
264
265             # '~=' searches don't work against Samba
266             # sanity-check an approx search against an exact match on value
267             # "({}~={})".format(test_attr, self.conf_value),
268
269             # check wildcard in an AND search...
270             "(&({}={}*)(objectclass=*))".format(test_attr, first_char),
271
272             # ...an OR search (against another term that will never match)
273             "(|({}={}*)(objectclass=banana))".format(test_attr, first_char)]
274
275         return searches
276
277     # return searches that match any object with the attribute under test
278     def get_match_all_searches(self):
279         searches = [
280             # check a full wildcard against the confidential attribute
281             # (which could reveal the attribute's presence/absence)
282             "({}=*)".format(self.conf_attr),
283
284             # check wildcard in an AND search...
285             "(&(objectclass=*)({}=*))".format(self.conf_attr),
286
287             # ...an OR search (against another term that will never match)
288             "(|(objectclass=banana)({}=*))".format(self.conf_attr),
289
290             # check <=, and >= expressions that would normally find a match
291             "({}>=0)".format(self.conf_attr),
292             "({}<=ZZZZZZZZZZZ)".format(self.conf_attr)]
293
294         return searches
295
296     def assert_conf_attr_searches(self, has_rights_to=0, samdb=None):
297         """Check searches against the attribute under test work as expected"""
298
299         if samdb is None:
300             samdb = self.ldb_user
301
302         if has_rights_to == "all":
303             has_rights_to = self.objects_with_attr
304
305         # these first few searches we just expect to match against the one
306         # object under test that we're trying to guess the value of
307         expected_num = 1 if has_rights_to > 0 else 0
308         for search in self.get_exact_match_searches():
309             self.assert_search_result(expected_num, search, samdb)
310
311         # these next searches will match any objects we have rights to see
312         expected_num = has_rights_to
313         for search in self.get_match_all_searches():
314             self.assert_search_result(expected_num, search, samdb)
315
316     # The following are double negative searches (i.e. NOT non-matching-
317     # condition) which will therefore match ALL objects, including the test
318     # object(s).
319     def get_negative_match_all_searches(self):
320         first_char = self.conf_value[:1]
321         last_char = self.conf_value[-1:]
322         not_first_char = chr(ord(first_char) + 1)
323         not_last_char = chr(ord(last_char) + 1)
324
325         searches = [
326             "(!({}={}*))".format(self.conf_attr, not_first_char),
327             "(!({}=*{}))".format(self.conf_attr, not_last_char)]
328         return searches
329
330     # the following searches will not match against the test object(s). So
331     # a user with sufficient rights will see an inverse sub-set of objects.
332     # (An unprivileged user would either see all objects on Windows, or no
333     # objects on Samba)
334     def get_inverse_match_searches(self):
335         first_char = self.conf_value[:1]
336         last_char = self.conf_value[-1:]
337         searches = [
338             "(!({}={}*))".format(self.conf_attr, first_char),
339             "(!({}=*{}))".format(self.conf_attr, last_char)]
340         return searches
341
342     def negative_searches_all_rights(self, total_objects=None):
343         expected_results = {}
344
345         if total_objects is None:
346             total_objects = self.total_objects
347
348         # these searches should match ALL objects (including the OU)
349         for search in self.get_negative_match_all_searches():
350             expected_results[search] = total_objects
351
352         # a ! wildcard should only match the objects without the attribute
353         search = "(!({}=*))".format(self.conf_attr)
354         expected_results[search] = total_objects - self.objects_with_attr
355
356         # whereas the inverse searches should match all objects *except* the
357         # one under test
358         for search in self.get_inverse_match_searches():
359             expected_results[search] = total_objects - 1
360
361         return expected_results
362
363     # Returns the expected negative (i.e. '!') search behaviour when talking to
364     # a DC with DC_MODE_RETURN_ALL behaviour, i.e. we assert that users
365     # without rights always see ALL objects in '!' searches
366     def negative_searches_return_all(self, has_rights_to=0,
367                                      total_objects=None):
368         """Asserts user without rights cannot see objects in '!' searches"""
369         expected_results = {}
370
371         if total_objects is None:
372             total_objects = self.total_objects
373
374         # Windows 'hides' objects by always returning all of them, so negative
375         # searches that match all objects will simply return all objects
376         for search in self.get_negative_match_all_searches():
377             expected_results[search] = total_objects
378
379         # if the search is matching on an inverse subset (everything except the
380         # object under test), the
381         inverse_searches = self.get_inverse_match_searches()
382         inverse_searches += ["(!({}=*))".format(self.conf_attr)]
383
384         for search in inverse_searches:
385             expected_results[search] = total_objects - has_rights_to
386
387         return expected_results
388
389     # Returns the expected negative (i.e. '!') search behaviour when talking to
390     # a DC with DC_MODE_RETURN_NONE behaviour, i.e. we assert that users
391     # without rights cannot see objects in '!' searches at all
392     def negative_searches_return_none(self, has_rights_to=0):
393         expected_results = {}
394
395         # the 'match-all' searches should only return the objects we have
396         # access rights to (if any)
397         for search in self.get_negative_match_all_searches():
398             expected_results[search] = has_rights_to
399
400         # for inverse matches, we should NOT be told about any objects at all
401         inverse_searches = self.get_inverse_match_searches()
402         inverse_searches += ["(!({}=*))".format(self.conf_attr)]
403         for search in inverse_searches:
404             expected_results[search] = 0
405
406         return expected_results
407
408     # Returns the expected negative (i.e. '!') search behaviour. This varies
409     # depending on what type of DC we're talking to (i.e. Windows or Samba)
410     # and what access rights the user has
411     def negative_search_expected_results(self, has_rights_to, dc_mode,
412                                          total_objects=None):
413
414         if has_rights_to == "all":
415             expect_results = self.negative_searches_all_rights(total_objects)
416
417         # if it's a Samba DC, we only expect the 'match-all' searches to return
418         # the objects that we have access rights to (all others are hidden).
419         # Whereas Windows 'hides' the objects by always returning all of them
420         elif dc_mode == DC_MODE_RETURN_NONE:
421             expect_results = self.negative_searches_return_none(has_rights_to)
422         else:
423             expect_results = self.negative_searches_return_all(has_rights_to,
424                                                                total_objects)
425         return expect_results
426
427     def assert_negative_searches(self, has_rights_to=0,
428                                  dc_mode=DC_MODE_RETURN_NONE, samdb=None):
429         """Asserts user without rights cannot see objects in '!' searches"""
430
431         if samdb is None:
432             samdb = self.ldb_user
433
434         # build a dictionary of key=search-expr, value=expected_num assertions
435         expected_results = self.negative_search_expected_results(has_rights_to,
436                                                                  dc_mode)
437
438         for search, expected_num in expected_results.items():
439             self.assert_search_result(expected_num, search, samdb)
440
441     def assert_attr_returned(self, expect_attr, samdb, attrs):
442         # does a query that should always return a successful result, and
443         # checks whether the confidential attribute is present
444         res = samdb.search(self.conf_dn, expression="(objectClass=*)",
445                            scope=SCOPE_SUBTREE, attrs=attrs)
446         self.assertTrue(len(res) == 1)
447
448         attr_returned = False
449         for msg in res:
450             if self.conf_attr in msg:
451                 attr_returned = True
452         self.assertEqual(expect_attr, attr_returned)
453
454     def assert_attr_visible(self, expect_attr, samdb=None):
455         if samdb is None:
456             samdb = self.ldb_user
457
458         # sanity-check confidential attribute is/isn't returned as expected
459         # based on the filter attributes we ask for
460         self.assert_attr_returned(expect_attr, samdb, attrs=None)
461         self.assert_attr_returned(expect_attr, samdb, attrs=["*"])
462         self.assert_attr_returned(expect_attr, samdb, attrs=[self.conf_attr])
463
464         # filtering on a different attribute should never return the conf_attr
465         self.assert_attr_returned(expect_attr=False, samdb=samdb,
466                                   attrs=['name'])
467
468     def assert_attr_visible_to_admin(self):
469         # sanity-check the admin user can always see the confidential attribute
470         self.assert_conf_attr_searches(has_rights_to="all", samdb=self.ldb_admin)
471         self.assert_negative_searches(has_rights_to="all", samdb=self.ldb_admin)
472         self.assert_attr_visible(expect_attr=True, samdb=self.ldb_admin)
473
474
475 class ConfidentialAttrTest(ConfidentialAttrCommon):
476     def test_basic_search(self):
477         """Basic test confidential attributes aren't disclosed via searches"""
478
479         # check we can see a non-confidential attribute in a basic searches
480         self.assert_conf_attr_searches(has_rights_to="all")
481         self.assert_negative_searches(has_rights_to="all")
482         self.assert_attr_visible(expect_attr=True)
483
484         # now make the attribute confidential. Repeat the tests and check that
485         # an ordinary user can't see the attribute, or indirectly match on the
486         # attribute via the search expression
487         self.make_attr_confidential()
488
489         self.assert_conf_attr_searches(has_rights_to=0)
490         dc_mode = self.guess_dc_mode()
491         self.assert_negative_searches(has_rights_to=0, dc_mode=dc_mode)
492         self.assert_attr_visible(expect_attr=False)
493
494         # sanity-check we haven't hidden the attribute from the admin as well
495         self.assert_attr_visible_to_admin()
496
497     def _test_search_with_allow_acl(self, allow_ace):
498         """Checks a ACE with 'CR' rights can override a confidential attr"""
499         # make the test attribute confidential and check user can't see it
500         self.make_attr_confidential()
501
502         self.assert_conf_attr_searches(has_rights_to=0)
503         dc_mode = self.guess_dc_mode()
504         self.assert_negative_searches(has_rights_to=0, dc_mode=dc_mode)
505         self.assert_attr_visible(expect_attr=False)
506
507         # apply the allow ACE to the object under test
508         self.sd_utils.dacl_add_ace(self.conf_dn, allow_ace)
509
510         # the user should now be able to see the attribute for the one object
511         # we gave it rights to
512         self.assert_conf_attr_searches(has_rights_to=1)
513         self.assert_negative_searches(has_rights_to=1, dc_mode=dc_mode)
514         self.assert_attr_visible(expect_attr=True)
515
516         # sanity-check the admin can still see the attribute
517         self.assert_attr_visible_to_admin()
518
519     def test_search_with_attr_acl_override(self):
520         """Make the confidential attr visible via an OA attr ACE"""
521
522         # set the SEC_ADS_CONTROL_ACCESS bit ('CR') for the user for the
523         # attribute under test, so the user can see it once more
524         user_sid = self.get_user_sid_string(self.user)
525         ace = "(OA;;CR;{};;{})".format(self.conf_attr_guid, user_sid)
526
527         self._test_search_with_allow_acl(ace)
528
529     def test_search_with_propset_acl_override(self):
530         """Make the confidential attr visible via a Property-set ACE"""
531
532         # set the SEC_ADS_CONTROL_ACCESS bit ('CR') for the user for the
533         # property-set containing the attribute under test (i.e. the
534         # attributeSecurityGuid), so the user can see it once more
535         user_sid = self.get_user_sid_string(self.user)
536         ace = "(OA;;CR;{};;{})".format(self.conf_attr_sec_guid, user_sid)
537
538         self._test_search_with_allow_acl(ace)
539
540     def test_search_with_acl_override(self):
541         """Make the confidential attr visible via a general 'allow' ACE"""
542
543         # set the allow SEC_ADS_CONTROL_ACCESS bit ('CR') for the user
544         user_sid = self.get_user_sid_string(self.user)
545         ace = "(A;;CR;;;{})".format(user_sid)
546
547         self._test_search_with_allow_acl(ace)
548
549     def test_search_with_blanket_oa_acl(self):
550         """Make the confidential attr visible via a non-specific OA ACE"""
551
552         # this just checks that an Object Access (OA) ACE without a GUID
553         # specified will work the same as an 'Access' (A) ACE
554         user_sid = self.get_user_sid_string(self.user)
555         ace = "(OA;;CR;;;{})".format(user_sid)
556
557         self._test_search_with_allow_acl(ace)
558
559     def _test_search_with_neutral_acl(self, neutral_ace):
560         """Checks that a user does NOT gain access via an unrelated ACE"""
561
562         # make the test attribute confidential and check user can't see it
563         self.make_attr_confidential()
564
565         self.assert_conf_attr_searches(has_rights_to=0)
566         dc_mode = self.guess_dc_mode()
567         self.assert_negative_searches(has_rights_to=0, dc_mode=dc_mode)
568         self.assert_attr_visible(expect_attr=False)
569
570         # apply the ACE to the object under test
571         self.sd_utils.dacl_add_ace(self.conf_dn, neutral_ace)
572
573         # this should make no difference to the user's ability to see the attr
574         self.assert_conf_attr_searches(has_rights_to=0)
575         self.assert_negative_searches(has_rights_to=0, dc_mode=dc_mode)
576         self.assert_attr_visible(expect_attr=False)
577
578         # sanity-check the admin can still see the attribute
579         self.assert_attr_visible_to_admin()
580
581     def test_search_with_neutral_acl(self):
582         """Give the user all rights *except* CR for any attributes"""
583
584         # give the user all rights *except* CR and check it makes no difference
585         user_sid = self.get_user_sid_string(self.user)
586         ace = "(A;;RPWPCCDCLCLORCWOWDSDDTSW;;;{})".format(user_sid)
587         self._test_search_with_neutral_acl(ace)
588
589     def test_search_with_neutral_attr_acl(self):
590         """Give the user all rights *except* CR for the attribute under test"""
591
592         # giving user all OA rights *except* CR should make no difference
593         user_sid = self.get_user_sid_string(self.user)
594         rights = "RPWPCCDCLCLORCWOWDSDDTSW"
595         ace = "(OA;;{};{};;{})".format(rights, self.conf_attr_guid, user_sid)
596         self._test_search_with_neutral_acl(ace)
597
598     def test_search_with_neutral_cr_acl(self):
599         """Give the user CR rights for *another* unrelated attribute"""
600
601         # giving user object-access CR rights to an unrelated attribute
602         user_sid = self.get_user_sid_string(self.user)
603         # use the GUID for sAMAccountName here (for no particular reason)
604         unrelated_attr = "3e0abfd0-126a-11d0-a060-00aa006c33ed"
605         ace = "(OA;;CR;{};;{})".format(unrelated_attr, user_sid)
606         self._test_search_with_neutral_acl(ace)
607
608
609 # Check that a Deny ACL on an attribute doesn't reveal confidential info
610 class ConfidentialAttrTestDenyAcl(ConfidentialAttrCommon):
611
612     def assert_not_in_result(self, res, exclude_dn):
613         for msg in res:
614             self.assertNotEqual(msg.dn, exclude_dn,
615                                 "Search revealed object {}".format(exclude_dn))
616
617     # deny ACL tests are slightly different as we are only denying access to
618     # the one object under test (rather than any objects with that attribute).
619     # Therefore we need an extra check that we don't reveal the test object
620     # in the search, if we're not supposed to
621     def assert_search_result(self, expected_num, expr, samdb,
622                              excl_testobj=False):
623
624         # try asking for different attributes back: None/all, the confidential
625         # attribute itself, and a random unrelated attribute
626         attr_filters = [None, ["*"], [self.conf_attr], ['name']]
627         for attr in attr_filters:
628             res = samdb.search(self.test_dn, expression=expr,
629                                scope=SCOPE_SUBTREE, attrs=attr)
630             self.assertTrue(len(res) == expected_num,
631                             "%u results, not %u for search %s, attr %s" %
632                             (len(res), expected_num, expr, str(attr)))
633
634             # assert we haven't revealed the hidden test-object
635             if excl_testobj:
636                 self.assert_not_in_result(res, exclude_dn=self.conf_dn)
637
638     # we make a few tweaks to the regular version of this function to cater to
639     # denying specifically one object via an ACE
640     def assert_conf_attr_searches(self, has_rights_to=0, samdb=None):
641         """Check searches against the attribute under test work as expected"""
642
643         if samdb is None:
644             samdb = self.ldb_user
645
646         # make sure the test object is not returned if we've been denied rights
647         # to it via an ACE
648         excl_testobj = True if has_rights_to == "deny-one" else False
649
650         # these first few searches we just expect to match against the one
651         # object under test that we're trying to guess the value of
652         expected_num = 1 if has_rights_to == "all" else 0
653
654         for search in self.get_exact_match_searches():
655             self.assert_search_result(expected_num, search, samdb,
656                                       excl_testobj)
657
658         # these next searches will match any objects with the attribute that
659         # we have rights to see (i.e. all except the object under test)
660         if has_rights_to == "all":
661             expected_num = self.objects_with_attr
662         elif has_rights_to == "deny-one":
663             expected_num = self.objects_with_attr - 1
664
665         for search in self.get_match_all_searches():
666             self.assert_search_result(expected_num, search, samdb,
667                                       excl_testobj)
668
669     def negative_searches_return_none(self, has_rights_to=0):
670         expected_results = {}
671
672         # on Samba we will see the objects we have rights to, but the one we
673         # are denied access to will be hidden
674         searches = self.get_negative_match_all_searches()
675         searches += self.get_inverse_match_searches()
676         for search in searches:
677             expected_results[search] = self.total_objects - 1
678
679         # The wildcard returns the objects without this attribute as normal.
680         search = "(!({}=*))".format(self.conf_attr)
681         expected_results[search] = self.total_objects - self.objects_with_attr
682         return expected_results
683
684     def negative_searches_return_all(self, has_rights_to=0,
685                                      total_objects=None):
686         expected_results = {}
687
688         # When a user lacks access rights to an object, Windows 'hides' it in
689         # '!' searches by always returning it, regardless of whether it matches
690         searches = self.get_negative_match_all_searches()
691         searches += self.get_inverse_match_searches()
692         for search in searches:
693             expected_results[search] = self.total_objects
694
695         # in the wildcard case, the one object we don't have rights to gets
696         # bundled in with the objects that don't have the attribute at all
697         search = "(!({}=*))".format(self.conf_attr)
698         has_rights_to = self.objects_with_attr - 1
699         expected_results[search] = self.total_objects - has_rights_to
700         return expected_results
701
702     def assert_negative_searches(self, has_rights_to=0,
703                                  dc_mode=DC_MODE_RETURN_NONE, samdb=None):
704         """Asserts user without rights cannot see objects in '!' searches"""
705
706         if samdb is None:
707             samdb = self.ldb_user
708
709         # As the deny ACL is only denying access to one particular object, add
710         # an extra check that the denied object is not returned. (We can only
711         # assert this if the '!'/negative search behaviour is to suppress any
712         # objects we don't have access rights to)
713         excl_testobj = False
714         if has_rights_to != "all" and dc_mode == DC_MODE_RETURN_NONE:
715             excl_testobj = True
716
717         # build a dictionary of key=search-expr, value=expected_num assertions
718         expected_results = self.negative_search_expected_results(has_rights_to,
719                                                                  dc_mode)
720
721         for search, expected_num in expected_results.items():
722             self.assert_search_result(expected_num, search, samdb,
723                                       excl_testobj=excl_testobj)
724
725     def _test_search_with_deny_acl(self, ace):
726         # check the user can see the attribute initially
727         self.assert_conf_attr_searches(has_rights_to="all")
728         self.assert_negative_searches(has_rights_to="all")
729         self.assert_attr_visible(expect_attr=True)
730
731         # add the ACE that denies access to the attr under test
732         self.sd_utils.dacl_add_ace(self.conf_dn, ace)
733
734         # the user shouldn't be able to see the attribute anymore
735         self.assert_conf_attr_searches(has_rights_to="deny-one")
736         dc_mode = self.guess_dc_mode()
737         self.assert_negative_searches(has_rights_to="deny-one",
738                                       dc_mode=dc_mode)
739         self.assert_attr_visible(expect_attr=False)
740
741         # sanity-check we haven't hidden the attribute from the admin as well
742         self.assert_attr_visible_to_admin()
743
744     def test_search_with_deny_attr_acl(self):
745         """Checks a deny ACE works the same way as a confidential attribute"""
746
747         # add an ACE that denies the user Read Property (RP) access to the attr
748         # (which is similar to making the attribute confidential)
749         user_sid = self.get_user_sid_string(self.user)
750         ace = "(OD;;RP;{};;{})".format(self.conf_attr_guid, user_sid)
751
752         # check the user cannot see the attribute anymore
753         self._test_search_with_deny_acl(ace)
754
755     def test_search_with_deny_acl(self):
756         """Checks a blanket deny ACE denies access to an object's attributes"""
757
758         # add an blanket deny ACE for Read Property (RP) rights
759         user_dn = self.get_user_dn(self.user)
760         user_sid = self.sd_utils.get_object_sid(user_dn)
761         ace = "(D;;RP;;;{})".format(str(user_sid))
762
763         # check the user cannot see the attribute anymore
764         self._test_search_with_deny_acl(ace)
765
766     def test_search_with_deny_propset_acl(self):
767         """Checks a deny ACE on the attribute's Property-Set"""
768
769         # add an blanket deny ACE for Read Property (RP) rights
770         user_sid = self.get_user_sid_string(self.user)
771         ace = "(OD;;RP;{};;{})".format(self.conf_attr_sec_guid, user_sid)
772
773         # check the user cannot see the attribute anymore
774         self._test_search_with_deny_acl(ace)
775
776     def test_search_with_blanket_oa_deny_acl(self):
777         """Checks a non-specific 'OD' ACE works the same as a 'D' ACE"""
778
779         # this just checks that adding a 'Object Deny' (OD) ACE without
780         # specifying a GUID will work the same way as a 'Deny' (D) ACE
781         user_sid = self.get_user_sid_string(self.user)
782         ace = "(OD;;RP;;;{})".format(user_sid)
783
784         # check the user cannot see the attribute anymore
785         self._test_search_with_deny_acl(ace)
786
787
788 # Check that using the dirsync controls doesn't reveal confidential attributes
789 class ConfidentialAttrTestDirsync(ConfidentialAttrCommon):
790
791     def setUp(self):
792         super(ConfidentialAttrTestDirsync, self).setUp()
793         self.dirsync = ["dirsync:1:1:1000"]
794
795         # because we need to search on the base DN when using the dirsync
796         # controls, we need an extra filter for the inverse ('!') search,
797         # so we don't get thousands of objects returned
798         self.extra_filter = \
799             "(&(samaccountname={}*)(!(isDeleted=*)))".format(self.user_prefix)
800         self.single_obj_filter = \
801             "(&(samaccountname={})(!(isDeleted=*)))".format(self.conf_user)
802
803         self.attr_filters = [None, ["*"], ["name"]]
804
805         # Note dirsync behaviour is slighty different for the attribute under
806         # test - when you have full access rights, it only returns the objects
807         # that actually have this attribute (i.e. it doesn't return an empty
808         # message with just the DN). So we add the 'name' attribute into the
809         # attribute filter to avoid complicating our assertions further
810         self.attr_filters += [[self.conf_attr, "name"]]
811
812     def assert_search_result(self, expected_num, expr, samdb, base_dn=None):
813
814         # Note dirsync must always search on the partition base DN
815         if base_dn is None:
816             base_dn = self.base_dn
817
818         # we need an extra filter for dirsync because:
819         # - we search on the base DN, so otherwise the '!' searches return
820         #   thousands of unrelated results, and
821         # - we make the test attribute preserve-on-delete in one case, so we
822         #   want to weed out results from any previous test runs
823         search = "(&{}{})".format(expr, self.extra_filter)
824
825         for attr in self.attr_filters:
826             res = samdb.search(base_dn, expression=search, scope=SCOPE_SUBTREE,
827                                attrs=attr, controls=self.dirsync)
828             self.assertTrue(len(res) == expected_num,
829                             "%u results, not %u for search %s, attr %s" %
830                             (len(res), expected_num, search, str(attr)))
831
832     def assert_attr_returned(self, expect_attr, samdb, attrs,
833                              no_result_ok=False):
834
835         # When using dirsync, the base DN we search on needs to be a naming
836         # context. Add an extra filter to ignore all the objects we aren't
837         # interested in
838         expr = self.single_obj_filter
839         res = samdb.search(self.base_dn, expression=expr, scope=SCOPE_SUBTREE,
840                            attrs=attrs, controls=self.dirsync)
841         self.assertTrue(len(res) == 1 or no_result_ok)
842
843         attr_returned = False
844         for msg in res:
845             if self.conf_attr in msg and len(msg[self.conf_attr]) > 0:
846                 attr_returned = True
847         self.assertEqual(expect_attr, attr_returned)
848
849     def assert_attr_visible(self, expect_attr, samdb=None):
850         if samdb is None:
851             samdb = self.ldb_user
852
853         # sanity-check confidential attribute is/isn't returned as expected
854         # based on the filter attributes we ask for
855         self.assert_attr_returned(expect_attr, samdb, attrs=None)
856         self.assert_attr_returned(expect_attr, samdb, attrs=["*"])
857
858         if expect_attr:
859             self.assert_attr_returned(expect_attr, samdb,
860                                       attrs=[self.conf_attr])
861         else:
862             # The behaviour with dirsync when asking solely for an attribute
863             # that you don't have rights to is a bit strange. Samba returns
864             # no result rather than an empty message with just the DN.
865             # Presumably this is due to dirsync module behaviour. It's not
866             # disclosive in that the DC behaves the same way as if you asked
867             # for a garbage/non-existent attribute
868             self.assert_attr_returned(expect_attr, samdb,
869                                       attrs=[self.conf_attr],
870                                       no_result_ok=True)
871             self.assert_attr_returned(expect_attr, samdb,
872                                       attrs=["garbage"], no_result_ok=True)
873
874         # filtering on a different attribute should never return the conf_attr
875         self.assert_attr_returned(expect_attr=False, samdb=samdb,
876                                   attrs=['name'])
877
878     def assert_negative_searches(self, has_rights_to=0,
879                                  dc_mode=DC_MODE_RETURN_NONE, samdb=None):
880         """Asserts user without rights cannot see objects in '!' searches"""
881
882         if samdb is None:
883             samdb = self.ldb_user
884
885         # because dirsync uses an extra filter, the total objects we expect
886         # here only includes the user objects (not the parent OU)
887         total_objects = len(self.all_users)
888         expected_results = self.negative_search_expected_results(has_rights_to,
889                                                                  dc_mode,
890                                                                  total_objects)
891
892         for search, expected_num in expected_results.items():
893             self.assert_search_result(expected_num, search, samdb)
894
895     def test_search_with_dirsync(self):
896         """Checks dirsync controls don't reveal confidential attributes"""
897
898         self.assert_conf_attr_searches(has_rights_to="all")
899         self.assert_attr_visible(expect_attr=True)
900         self.assert_negative_searches(has_rights_to="all")
901
902         # make the test attribute confidential and check user can't see it,
903         # even if they use the dirsync controls
904         self.make_attr_confidential()
905
906         self.assert_conf_attr_searches(has_rights_to=0)
907         self.assert_attr_visible(expect_attr=False)
908         dc_mode = self.guess_dc_mode()
909         self.assert_negative_searches(has_rights_to=0, dc_mode=dc_mode)
910
911         # as a final sanity-check, make sure the admin can still see the attr
912         self.assert_conf_attr_searches(has_rights_to="all",
913                                        samdb=self.ldb_admin)
914         self.assert_attr_visible(expect_attr=True, samdb=self.ldb_admin)
915         self.assert_negative_searches(has_rights_to="all",
916                                       samdb=self.ldb_admin)
917
918     def get_guid(self, dn):
919         """Returns an object's GUID (in string format)"""
920         res = self.ldb_admin.search(base=dn, attrs=["objectGUID"],
921                                     scope=SCOPE_BASE)
922         guid = res[0]['objectGUID'][0]
923         return self.ldb_admin.schema_format_value("objectGUID", guid)
924
925     def make_attr_preserve_on_delete(self):
926         """Marks the attribute under test as being preserve on delete"""
927
928         # work out the original 'searchFlags' value before we overwrite it
929         search_flags = int(self.get_attr_search_flags(self.attr_dn))
930
931         # check we've already set the confidential flag
932         self.assertTrue(search_flags & SEARCH_FLAG_CONFIDENTIAL != 0)
933         search_flags |= SEARCH_FLAG_PRESERVEONDELETE
934
935         self.set_attr_search_flags(self.attr_dn, str(search_flags))
936
937     def change_attr_under_test(self, attr_name, attr_cn):
938         # change the attribute that the test code uses
939         self.conf_attr = attr_name
940         self.attr_dn = "{},{}".format(attr_cn, self.schema_dn)
941
942         # set the new attribute for the user-under-test
943         self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
944
945         # 2 other users also have the attribute-under-test set (to a randomish
946         # value). Set the new attribute for them now (normally this gets done
947         # in the setUp())
948         for username in self.all_users:
949             if "other-user" in username:
950                 dn = self.get_user_dn(username)
951                 self.add_attr(dn, self.conf_attr, "xyz-blah")
952
953     def test_search_with_dirsync_deleted_objects(self):
954         """Checks dirsync doesn't reveal confidential info for deleted objs"""
955
956         # change the attribute we're testing (we'll preserve on delete for this
957         # test case, which means the attribute-under-test hangs around after
958         # the test case finishes, and would interfere with the searches for
959         # subsequent other test cases)
960         self.change_attr_under_test("carLicense", "CN=carLicense")
961
962         # Windows dirsync behaviour is a little strange when you request
963         # attributes that deleted objects no longer have, so just request 'all
964         # attributes' to simplify the test logic
965         self.attr_filters = [None, ["*"]]
966
967         # normally dirsync uses extra filters to exclude deleted objects that
968         # we're not interested in. Override these filters so they WILL include
969         # deleted objects, but only from this particular test run. We can do
970         # this by matching lastKnownParent against this test case's OU, which
971         # will match any deleted child objects.
972         ou_guid = self.get_guid(self.ou)
973         deleted_filter = "(lastKnownParent=<GUID={}>)".format(ou_guid)
974
975         # the extra-filter will get combined via AND with the search expression
976         # we're testing, i.e. filter on the confidential attribute AND only
977         # include non-deleted objects, OR deleted objects from this test run
978         exclude_deleted_objs_filter = self.extra_filter
979         self.extra_filter = "(|{}{})".format(exclude_deleted_objs_filter,
980                                              deleted_filter)
981
982         # for matching on a single object, the search expresseion becomes:
983         # match exactly by account-name AND either a non-deleted object OR a
984         # deleted object from this test run
985         match_by_name = "(samaccountname={})".format(self.conf_user)
986         not_deleted = "(!(isDeleted=*))"
987         self.single_obj_filter = "(&{}(|{}{}))".format(match_by_name,
988                                                        not_deleted,
989                                                        deleted_filter)
990
991         # check that the search filters work as expected
992         self.assert_conf_attr_searches(has_rights_to="all")
993         self.assert_attr_visible(expect_attr=True)
994         self.assert_negative_searches(has_rights_to="all")
995
996         # make the test attribute confidential *and* preserve on delete.
997         self.make_attr_confidential()
998         self.make_attr_preserve_on_delete()
999
1000         # check we can't see the objects now, even with using dirsync controls
1001         self.assert_conf_attr_searches(has_rights_to=0)
1002         self.assert_attr_visible(expect_attr=False)
1003         dc_mode = self.guess_dc_mode()
1004         self.assert_negative_searches(has_rights_to=0, dc_mode=dc_mode)
1005
1006         # now delete the users (except for the user whose LDB connection
1007         # we're currently using)
1008         for user in self.all_users:
1009             if user != self.user:
1010                 self.ldb_admin.delete(self.get_user_dn(user))
1011
1012         # check we still can't see the objects
1013         self.assert_conf_attr_searches(has_rights_to=0)
1014         self.assert_negative_searches(has_rights_to=0, dc_mode=dc_mode)
1015
1016 TestProgram(module=__name__, opts=subunitopts)