dsdb tests: add linked attribute tests
authorDouglas Bagnall <douglas.bagnall@catalyst.net.nz>
Thu, 30 Jun 2016 04:35:08 +0000 (16:35 +1200)
committerGarming Sam <garming@samba.org>
Fri, 15 Jul 2016 08:01:28 +0000 (10:01 +0200)
Note that this test will not work properly across ldap as the
marked-deleted linked attributes will not appear.

Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
source4/dsdb/tests/python/linked_attributes.py [new file with mode: 0644]
source4/selftest/tests.py

diff --git a/source4/dsdb/tests/python/linked_attributes.py b/source4/dsdb/tests/python/linked_attributes.py
new file mode 100644 (file)
index 0000000..c7afad4
--- /dev/null
@@ -0,0 +1,363 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Originally based on ./sam.py
+import optparse
+import sys
+import os
+import base64
+import random
+import re
+
+sys.path.insert(0, "bin/python")
+import samba
+from samba.tests.subunitrun import SubunitOptions, TestProgram
+
+import samba.getopt as options
+
+from samba.auth import system_session
+import ldb
+from samba.samdb import SamDB
+from samba.dcerpc import misc
+
+import time
+
+parser = optparse.OptionParser("linked_attributes.py [options] <host>")
+sambaopts = options.SambaOptions(parser)
+parser.add_option_group(sambaopts)
+parser.add_option_group(options.VersionOptions(parser))
+# use command line creds if available
+credopts = options.CredentialsOptions(parser)
+parser.add_option_group(credopts)
+subunitopts = SubunitOptions(parser)
+parser.add_option_group(subunitopts)
+
+parser.add_option('--delete-in-setup', action='store_true',
+                  help="cleanup in setup")
+
+parser.add_option('--no-cleanup', action='store_true',
+                  help="don't cleanup in teardown")
+
+parser.add_option('--no-reveal-internals', action='store_true',
+                  help="Only use windows compatible ldap controls")
+
+opts, args = parser.parse_args()
+
+if len(args) < 1:
+    parser.print_usage()
+    sys.exit(1)
+
+host = args[0]
+
+lp = sambaopts.get_loadparm()
+creds = credopts.get_credentials(lp)
+
+
+class LATestException(Exception):
+    pass
+
+
+class LATests(samba.tests.TestCase):
+
+    def setUp(self):
+        super(LATests, self).setUp()
+        self.samdb = SamDB(host, credentials=creds,
+                         session_info=system_session(lp), lp=lp)
+
+        self.base_dn = self.samdb.domain_dn()
+        self.ou = "OU=la,%s" % self.base_dn
+        if opts.delete_in_setup:
+            try:
+                self.samdb.delete(self.ou, ['tree_delete:1'])
+            except ldb.LdbError, e:
+                print "tried deleting %s, got error %s" % (self.ou, e)
+        self.samdb.add({'objectclass': 'organizationalUnit',
+                        'dn': self.ou})
+
+    def tearDown(self):
+        super(LATests, self).tearDown()
+        if not opts.no_cleanup:
+            self.samdb.delete(self.ou, ['tree_delete:1'])
+
+    def delete_user(self, user):
+        self.samdb.delete(user['dn'])
+        del self.users[self.users.index(user)]
+
+    def add_object(self, cn, objectclass):
+        dn = "CN=%s,%s" % (cn, self.ou)
+        self.samdb.add({'cn': cn,
+                      'objectclass': objectclass,
+                      'dn': dn})
+
+        return dn
+
+    def add_objects(self, n, objectclass, prefix=None):
+        if prefix is None:
+            prefix = objectclass
+        dns = []
+        for i in range(n):
+            dns.append(self.add_object("%s%d" % (prefix, i + 1),
+                                       objectclass))
+        return dns
+
+    def add_linked_attribute(self, src, dest, attr='member'):
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.samdb, src)
+        m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
+        self.samdb.modify(m)
+
+    def remove_linked_attribute(self, src, dest, attr='member'):
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.samdb, src)
+        m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
+        self.samdb.modify(m)
+
+    def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE,
+                    **controls):
+        if opts.no_reveal_internals:
+            if 'reveal_internals' in controls:
+                del controls['reveal_internals']
+
+        controls = ['%s:%d' % (k, int(v)) for k, v in controls.items()]
+
+        res = self.samdb.search(obj,
+                                scope=scope,
+                                attrs=[attr],
+                                controls=controls)
+        return res
+
+    def assert_links(self, obj, expected, attr, sorted=False, msg='',
+                     **kwargs):
+        res = self.attr_search(obj, expected, attr, **kwargs)
+
+        if len(expected) == 0:
+            if attr in res[0]:
+                self.fail("found attr '%s' in %s" % (attr, res[0]))
+            return
+
+        try:
+            results = list([x[attr] for x in res][0])
+        except KeyError:
+            self.fail("missing attr '%s' on %s" % (attr, obj))
+
+        if sorted == False:
+            results = set(results)
+            expected = set(expected)
+
+        if expected != results:
+            print msg
+            print "expected %s" % expected
+            print "received %s" % results
+
+        self.assertEqual(results, expected)
+
+    def assert_back_links(self, obj, expected, attr='memberOf', **kwargs):
+        self.assert_links(obj, expected, attr=attr,
+                          msg='back links do not match', **kwargs)
+
+    def assert_forward_links(self, obj, expected, attr='member', **kwargs):
+        self.assert_links(obj, expected, attr=attr,
+                          msg='forward links do not match', **kwargs)
+
+    def get_object_guid(self, dn):
+        res = self.samdb.search(dn,
+                                scope=ldb.SCOPE_BASE,
+                                attrs=['objectGUID'])
+        return str(misc.GUID(res[0]['objectGUID'][0]))
+
+    def _test_la_backlinks(self, reveal=False):
+        tag = 'backlinks'
+        kwargs = {}
+        if reveal:
+            tag += '_reveal'
+            kwargs = {'reveal_internals': 0}
+
+        u1, u2 = self.add_objects(2, 'user', 'u_%s' % tag)
+        g1, g2 = self.add_objects(2, 'group', 'g_%s' % tag)
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.assert_back_links(u1, [g1, g2], **kwargs)
+        self.assert_back_links(u2, [g2], **kwargs)
+
+    def test_la_backlinks(self):
+        self._test_la_backlinks()
+
+    def test_la_backlinks_reveal(self):
+        if opts.no_reveal_internals:
+            print 'skipping because --no-reveal-internals'
+            return
+        self._test_la_backlinks(True)
+
+    def _test_la_backlinks_delete_group(self, reveal=False):
+        tag = 'del_group'
+        kwargs = {}
+        if reveal:
+            tag += '_reveal'
+            kwargs = {'reveal_internals': 0}
+
+        u1, u2 = self.add_objects(2, 'user', 'u_' + tag)
+        g1, g2 = self.add_objects(2, 'group', 'g_' + tag)
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.samdb.delete(g2, ['tree_delete:1'])
+
+        self.assert_back_links(u1, [g1], **kwargs)
+        self.assert_back_links(u2, set(), **kwargs)
+
+    def test_la_backlinks_delete_group(self):
+        self._test_la_backlinks_delete_group()
+
+    def test_la_backlinks_delete_group_reveal(self):
+        if opts.no_reveal_internals:
+            print 'skipping because --no-reveal-internals'
+            return
+        self._test_la_backlinks_delete_group(True)
+
+    def test_links_all_delete_group(self):
+        u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
+        g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
+        g2guid = self.get_object_guid(g2)
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.samdb.delete(g2)
+        self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
+                               show_deactivated_link=0)
+        self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
+                               show_deactivated_link=0)
+        self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
+                                  show_deactivated_link=0)
+        self.assert_forward_links('<GUID=%s>' % g2guid,
+                                  [], show_deleted=1, show_recycled=1,
+                                  show_deactivated_link=0)
+
+    def test_links_all_delete_group_reveal(self):
+        u1, u2 = self.add_objects(2, 'user', 'u_all_del_group_reveal')
+        g1, g2 = self.add_objects(2, 'group', 'g_all_del_group_reveal')
+        g2guid = self.get_object_guid(g2)
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.samdb.delete(g2)
+        self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
+                               show_deactivated_link=0,
+                                  reveal_internals=0)
+        self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
+                               show_deactivated_link=0,
+                                  reveal_internals=0)
+        self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
+                                  show_deactivated_link=0,
+                                  reveal_internals=0)
+        self.assert_forward_links('<GUID=%s>' % g2guid,
+                                  [], show_deleted=1, show_recycled=1,
+                                  show_deactivated_link=0,
+                                  reveal_internals=0)
+
+    def test_la_links_delete_link(self):
+        u1, u2 = self.add_objects(2, 'user', 'u_del_link')
+        g1, g2 = self.add_objects(2, 'group', 'g_del_link')
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.remove_linked_attribute(g2, u1)
+
+        self.assert_forward_links(g1, [u1])
+        self.assert_forward_links(g2, [u2])
+
+        self.add_linked_attribute(g2, u1)
+        self.assert_forward_links(g2, [u1, u2])
+        self.remove_linked_attribute(g2, u2)
+        self.assert_forward_links(g2, [u1])
+        self.remove_linked_attribute(g2, u1)
+        self.assert_forward_links(g2, [])
+
+    def test_la_links_delete_link_reveal(self):
+        u1, u2 = self.add_objects(2, 'user', 'u_del_link_reveal')
+        g1, g2 = self.add_objects(2, 'group', 'g_del_link_reveal')
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.remove_linked_attribute(g2, u1)
+
+        self.assert_forward_links(g2, [u1, u2], show_deleted=1,
+                                  show_recycled=1,
+                                  show_deactivated_link=0,
+                                  reveal_internals=0
+        )
+
+    def test_la_links_delete_user(self):
+        u1, u2 = self.add_objects(2, 'user', 'u_del_user')
+        g1, g2 = self.add_objects(2, 'group', 'g_del_user')
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.samdb.delete(u1)
+
+        self.assert_forward_links(g1, [])
+        self.assert_forward_links(g2, [u2])
+
+    def test_la_links_delete_user_reveal(self):
+        u1, u2 = self.add_objects(2, 'user', 'u_del_user_reveal')
+        g1, g2 = self.add_objects(2, 'group', 'g_del_user_reveal')
+
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+
+        self.samdb.delete(u1)
+
+        self.assert_forward_links(g2, [u2],
+                                  show_deleted=1, show_recycled=1,
+                                  show_deactivated_link=0,
+                                  reveal_internals=0)
+        self.assert_forward_links(g1, [],
+                                  show_deleted=1, show_recycled=1,
+                                  show_deactivated_link=0,
+                                  reveal_internals=0)
+
+    def _test_la_links_sort_order(self):
+        u1, u2, u3 = self.add_objects(3, 'user', 'u_sort_order')
+        g1, g2, g3 = self.add_objects(3, 'group', 'g_sort_order')
+
+        # Add these in a haphazard order
+        self.add_linked_attribute(g2, u3)
+        self.add_linked_attribute(g3, u2)
+        self.add_linked_attribute(g1, u3)
+        self.add_linked_attribute(g1, u1)
+        self.add_linked_attribute(g2, u1)
+        self.add_linked_attribute(g2, u2)
+        self.add_linked_attribute(g3, u3)
+        self.add_linked_attribute(g3, u1)
+
+        self.assert_forward_links(g1, [u3, u1], sorted=True)
+        self.assert_forward_links(g2, [u3, u2, u1], sorted=True)
+        self.assert_forward_links(g3, [u3, u2, u1], sorted=True)
+
+        self.assert_back_links(u1, [g3, g2, g1], sorted=True)
+        self.assert_back_links(u2, [g3, g2], sorted=True)
+        self.assert_back_links(u3, [g3, g2, g1], sorted=True)
+
+
+if "://" not in host:
+    if os.path.isfile(host):
+        host = "tdb://%s" % host
+    else:
+        host = "ldap://%s" % host
+
+
+TestProgram(module=__name__, opts=subunitopts)
index 1b1394017a7f7da1a3fb2bf13a21bb4e53197837..01cb87b1db3ec328157012ee82f5969b9444e571 100755 (executable)
@@ -583,6 +583,7 @@ plantestsuite_loadlist("samba4.ldap.sites.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [
 
 plantestsuite_loadlist("samba4.ldap.sort.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/sort.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
 plantestsuite_loadlist("samba4.ldap.vlv.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/vlv.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
+plantestsuite_loadlist("samba4.ldap.linked_attributes.python(ad_dc_ntvfs)", "ad_dc_ntvfs:local", [python, os.path.join(samba4srcdir, "dsdb/tests/python/linked_attributes.py"), '$PREFIX_ABS/ad_dc_ntvfs/private/sam.ldb', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
 
 for env in ["ad_dc_ntvfs", "fl2000dc", "fl2003dc", "fl2008r2dc"]:
     plantestsuite_loadlist("samba4.ldap_schema.python(%s)" % env, env, [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap_schema.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])