3a08a1a1b6513408d39721108fb9be1f49ac96d3
[kai/samba-autobuild/.git] / source4 / torture / drs / python / linked_attributes_drs.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 import sys
5 import os
6 import base64
7 import random
8 import re
9
10 sys.path.insert(0, "bin/python")
11 import samba
12 from samba.tests.subunitrun import SubunitOptions, TestProgram
13
14 import samba.getopt as options
15
16 from samba.auth import system_session
17 import ldb
18 from samba.samdb import SamDB
19 from samba.dcerpc import misc
20
21 from samba.dcerpc import drsuapi, misc, drsblobs
22 from samba.drs_utils import drs_DsBind
23 from samba.ndr import ndr_unpack, ndr_pack
24
25 import drs_base
26
27 import time
28
29
30 class LATestException(Exception):
31     pass
32
33
34 class LATests(drs_base.DrsBaseTestCase):
35
36     def setUp(self):
37         super(LATests, self).setUp()
38         # DrsBaseTestCase sets up self.ldb_dc1, self.ldb_dc2
39         # we're only using one
40         self.samdb = self.ldb_dc1
41
42         self.base_dn = self.samdb.domain_dn()
43         self.ou = "OU=la,%s" % self.base_dn
44         if True:
45             try:
46                 self.samdb.delete(self.ou, ['tree_delete:1'])
47             except ldb.LdbError as e:
48                 pass
49         self.samdb.add({'objectclass': 'organizationalUnit',
50                         'dn': self.ou})
51
52         self.dc_guid = self.samdb.get_invocation_id()
53         self.drs, self.drs_handle = self._ds_bind(self.dnsname_dc1)
54
55     def tearDown(self):
56         super(LATests, self).tearDown()
57         try:
58             self.samdb.delete(self.ou, ['tree_delete:1'])
59         except ldb.LdbError as e:
60             pass
61
62     def delete_user(self, user):
63         self.samdb.delete(user['dn'])
64         del self.users[self.users.index(user)]
65
66     def add_object(self, cn, objectclass):
67         dn = "CN=%s,%s" % (cn, self.ou)
68         self.samdb.add({'cn': cn,
69                         'objectclass': objectclass,
70                         'dn': dn})
71
72         return dn
73
74     def add_objects(self, n, objectclass, prefix=None):
75         if prefix is None:
76             prefix = objectclass
77         dns = []
78         for i in range(n):
79             dns.append(self.add_object("%s%d" % (prefix, i + 1),
80                                        objectclass))
81         return dns
82
83     def add_linked_attribute(self, src, dest, attr='member'):
84         m = ldb.Message()
85         m.dn = ldb.Dn(self.samdb, src)
86         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
87         self.samdb.modify(m)
88
89     def remove_linked_attribute(self, src, dest, attr='member'):
90         m = ldb.Message()
91         m.dn = ldb.Dn(self.samdb, src)
92         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
93         self.samdb.modify(m)
94
95     def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE):
96
97         req8 = self._exop_req8(dest_dsa=None,
98                                invocation_id=self.dc_guid,
99                                nc_dn_str=obj,
100                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
101
102         level, ctr = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
103         expected_attid = getattr(drsuapi, 'DRSUAPI_ATTID_' + attr)
104
105         links = []
106         for link in ctr.linked_attributes:
107             if link.attid == expected_attid:
108                 unpacked = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
109                                       link.value.blob)
110                 active = link.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
111                 links.append((str(unpacked.dn), bool(active)))
112
113         return links
114
115     def assert_forward_links(self, obj, expected, attr='member'):
116         results = self.attr_search(obj, expected, attr)
117         self.assertEqual(len(results), len(expected))
118
119         for k, v in results:
120             self.assertTrue(k in expected) 
121             self.assertEqual(expected[k], v, "%s active flag should be %d, not %d" %
122                              (k, expected[k], v))
123
124     def get_object_guid(self, dn):
125         res = self.samdb.search(dn,
126                                 scope=ldb.SCOPE_BASE,
127                                 attrs=['objectGUID'])
128         return str(misc.GUID(res[0]['objectGUID'][0]))
129
130     def test_links_all_delete_group(self):
131         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
132         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
133         g2guid = self.get_object_guid(g2)
134
135         self.add_linked_attribute(g1, u1)
136         self.add_linked_attribute(g2, u1)
137         self.add_linked_attribute(g2, u2)
138
139         self.samdb.delete(g2)
140         self.assert_forward_links(g1, {u1: True})
141         res = self.samdb.search('<GUID=%s>' % g2guid,
142                                 scope=ldb.SCOPE_BASE,
143                                 controls=['show_deleted:1'])
144         new_dn = res[0].dn
145         self.assert_forward_links(new_dn, {})
146
147     def test_la_links_delete_link(self):
148         u1, u2 = self.add_objects(2, 'user', 'u_del_link')
149         g1, g2 = self.add_objects(2, 'group', 'g_del_link')
150
151         self.add_linked_attribute(g1, u1)
152         self.add_linked_attribute(g2, u1)
153         self.add_linked_attribute(g2, u2)
154
155         self.remove_linked_attribute(g2, u1)
156
157         self.assert_forward_links(g1, {u1: True})
158         self.assert_forward_links(g2, {u1: False, u2: True})
159
160         self.add_linked_attribute(g2, u1)
161         self.remove_linked_attribute(g2, u2)
162         self.assert_forward_links(g2, {u1: True, u2: False})
163         self.remove_linked_attribute(g2, u1)
164         self.assert_forward_links(g2, {u1: False, u2: False})
165
166     def test_la_links_delete_user(self):
167         u1, u2 = self.add_objects(2, 'user', 'u_del_user')
168         g1, g2 = self.add_objects(2, 'group', 'g_del_user')
169
170         self.add_linked_attribute(g1, u1)
171         self.add_linked_attribute(g2, u1)
172         self.add_linked_attribute(g2, u2)
173
174         self.samdb.delete(u1)
175
176         self.assert_forward_links(g1, {})
177         self.assert_forward_links(g2, {u2: True})