2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
10 sys.path.insert(0, "bin/python")
12 from samba.tests.subunitrun import SubunitOptions, TestProgram
14 import samba.getopt as options
16 from samba.auth import system_session
18 from samba.samdb import SamDB
19 from samba.dcerpc import misc
21 from samba.dcerpc import drsuapi, misc, drsblobs
22 from samba.drs_utils import drs_DsBind
23 from samba.ndr import ndr_unpack, ndr_pack
29 class LATestException(Exception):
33 class LATests(drs_base.DrsBaseTestCase):
36 super(LATests, self).setUp()
37 # DrsBaseTestCase sets up self.ldb_dc1, self.ldb_dc2
38 # we're only using one
39 self.samdb = self.ldb_dc1
41 self.base_dn = self.samdb.domain_dn()
42 self.ou = "OU=la,%s" % self.base_dn
45 self.samdb.delete(self.ou, ['tree_delete:1'])
46 except ldb.LdbError as e:
48 self.samdb.add({'objectclass': 'organizationalUnit',
51 self.dc_guid = self.samdb.get_invocation_id()
52 self.drs, self.drs_handle = self._ds_bind(self.dnsname_dc1)
55 super(LATests, self).tearDown()
57 self.samdb.delete(self.ou, ['tree_delete:1'])
58 except ldb.LdbError as e:
61 def delete_user(self, user):
62 self.samdb.delete(user['dn'])
63 del self.users[self.users.index(user)]
65 def add_object(self, cn, objectclass):
66 dn = "CN=%s,%s" % (cn, self.ou)
67 self.samdb.add({'cn': cn,
68 'objectclass': objectclass,
73 def add_objects(self, n, objectclass, prefix=None):
78 dns.append(self.add_object("%s%d" % (prefix, i + 1),
82 def add_linked_attribute(self, src, dest, attr='member'):
84 m.dn = ldb.Dn(self.samdb, src)
85 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
88 def remove_linked_attribute(self, src, dest, attr='member'):
90 m.dn = ldb.Dn(self.samdb, src)
91 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
94 def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE):
96 req8 = self._exop_req8(dest_dsa=None,
97 invocation_id=self.dc_guid,
99 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
101 level, ctr = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
102 expected_attid = getattr(drsuapi, 'DRSUAPI_ATTID_' + attr)
105 for link in ctr.linked_attributes:
106 if link.attid == expected_attid:
107 unpacked = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
109 active = link.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
110 links.append((str(unpacked.dn), bool(active)))
115 def assert_forward_links(self, obj, expected, attr='member'):
116 results = self.attr_search(obj, expected, attr)
117 self.assertEqual(len(results), len(expected))
120 self.assertTrue(k in expected)
121 self.assertEqual(expected[k], v, "%s active flag should be %d, not %d" %
124 def get_object_guid(self, dn):
125 res = self.samdb.search(dn,
126 scope=ldb.SCOPE_BASE,
127 attrs=['objectGUID'])
128 return str(misc.GUID(res[0]['objectGUID'][0]))
130 def test_links_all_delete_group(self):
131 u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
132 g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
133 g2guid = self.get_object_guid(g2)
135 self.add_linked_attribute(g1, u1)
136 self.add_linked_attribute(g2, u1)
137 self.add_linked_attribute(g2, u2)
139 self.samdb.delete(g2)
140 self.assert_forward_links(g1, {u1: True})
141 res = self.samdb.search('<GUID=%s>' % g2guid,
142 scope=ldb.SCOPE_BASE,
143 controls=['show_deleted:1'])
145 self.assert_forward_links(new_dn, {})
148 def test_la_links_delete_link(self):
149 u1, u2 = self.add_objects(2, 'user', 'u_del_link')
150 g1, g2 = self.add_objects(2, 'group', 'g_del_link')
152 self.add_linked_attribute(g1, u1)
153 self.add_linked_attribute(g2, u1)
154 self.add_linked_attribute(g2, u2)
156 self.remove_linked_attribute(g2, u1)
158 self.assert_forward_links(g1, {u1: True})
159 self.assert_forward_links(g2, {u1: False, u2: True})
161 self.add_linked_attribute(g2, u1)
162 self.remove_linked_attribute(g2, u2)
163 self.assert_forward_links(g2, {u1: True, u2: False})
164 self.remove_linked_attribute(g2, u1)
165 self.assert_forward_links(g2, {u1: False, u2: False})
167 def test_la_links_delete_user(self):
168 u1, u2 = self.add_objects(2, 'user', 'u_del_user')
169 g1, g2 = self.add_objects(2, 'group', 'g_del_user')
171 self.add_linked_attribute(g1, u1)
172 self.add_linked_attribute(g2, u1)
173 self.add_linked_attribute(g2, u2)
175 self.samdb.delete(u1)
177 self.assert_forward_links(g1, {})
178 self.assert_forward_links(g2, {u2: True})