python:tests: Store keys as bytes rather than as lists of ints
[samba.git] / python / samba / tests / dsdb_schema_attributes.py
1 # -*- coding: utf-8 -*-
2 #
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 #
21 # Usage:
22 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
23 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/dsdb/tests/python" $SUBUNITRUN dsdb_schema_attributes
24 #
25
26 import time
27 import random
28
29 import samba.tests
30 import ldb
31 from ldb import SCOPE_BASE, LdbError
32
33
34 class SchemaAttributesTestCase(samba.tests.TestCase):
35
36     def setUp(self):
37         super(SchemaAttributesTestCase, self).setUp()
38
39         self.lp = samba.tests.env_loadparm()
40         self.samdb = samba.tests.connect_samdb(self.lp.samdb_url())
41
42         # fetch rootDSE
43         res = self.samdb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
44         self.assertEqual(len(res), 1)
45         self.schema_dn = res[0]["schemaNamingContext"][0]
46         self.base_dn = res[0]["defaultNamingContext"][0]
47         self.forest_level = int(res[0]["forestFunctionality"][0])
48
49     def tearDown(self):
50         super(SchemaAttributesTestCase, self).tearDown()
51
52     def _ldap_schemaUpdateNow(self):
53         ldif = """
54 dn:
55 changetype: modify
56 add: schemaUpdateNow
57 schemaUpdateNow: 1
58 """
59         self.samdb.modify_ldif(ldif)
60
61     def _make_obj_names(self, prefix):
62         obj_name = prefix + time.strftime("%s", time.gmtime())
63         obj_ldap_name = obj_name.replace("-", "")
64         obj_dn = "CN=%s,%s" % (obj_name, self.schema_dn)
65         return (obj_name, obj_ldap_name, obj_dn)
66
67     def _make_attr_ldif(self, attr_name, attr_dn, sub_oid, extra=None):
68         ldif = """
69 dn: """ + attr_dn + """
70 objectClass: top
71 objectClass: attributeSchema
72 adminDescription: """ + attr_name + """
73 adminDisplayName: """ + attr_name + """
74 cn: """ + attr_name + """
75 attributeId: 1.3.6.1.4.1.7165.4.6.1.8.%d.""" % sub_oid + str(random.randint(1, 100000)) + """
76 attributeSyntax: 2.5.5.12
77 omSyntax: 64
78 instanceType: 4
79 isSingleValued: TRUE
80 systemOnly: FALSE
81 """
82
83         if extra is not None:
84             ldif += extra + "\n"
85
86         return ldif
87
88     def test_AddIndexedAttribute(self):
89         # create names for an attribute to add
90         (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("schemaAttributes-Attr-")
91         ldif = self._make_attr_ldif(attr_name, attr_dn, 1,
92                                     "searchFlags: %d" % samba.dsdb.SEARCH_FLAG_ATTINDEX)
93
94         # add the new attribute
95         self.samdb.add_ldif(ldif)
96         self._ldap_schemaUpdateNow()
97
98         # Check @ATTRIBUTES
99
100         attr_res = self.samdb.search(base="@ATTRIBUTES", scope=ldb.SCOPE_BASE)
101
102         self.assertIn(attr_ldap_name, attr_res[0])
103         self.assertEqual(len(attr_res[0][attr_ldap_name]), 1)
104         self.assertEqual(str(attr_res[0][attr_ldap_name][0]), "CASE_INSENSITIVE")
105
106         # Check @INDEXLIST
107
108         idx_res = self.samdb.search(base="@INDEXLIST", scope=ldb.SCOPE_BASE)
109
110         self.assertIn(attr_ldap_name, [str(x) for x in idx_res[0]["@IDXATTR"]])
111
112     def test_AddUnIndexedAttribute(self):
113         # create names for an attribute to add
114         (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("schemaAttributes-Attr-")
115         ldif = self._make_attr_ldif(attr_name, attr_dn, 2)
116
117         # add the new attribute
118         self.samdb.add_ldif(ldif)
119         self._ldap_schemaUpdateNow()
120
121         # Check @ATTRIBUTES
122
123         attr_res = self.samdb.search(base="@ATTRIBUTES", scope=ldb.SCOPE_BASE)
124
125         self.assertIn(attr_ldap_name, attr_res[0])
126         self.assertEqual(len(attr_res[0][attr_ldap_name]), 1)
127         self.assertEqual(str(attr_res[0][attr_ldap_name][0]), "CASE_INSENSITIVE")
128
129         # Check @INDEXLIST
130
131         idx_res = self.samdb.search(base="@INDEXLIST", scope=ldb.SCOPE_BASE)
132
133         self.assertNotIn(attr_ldap_name, [str(x) for x in idx_res[0]["@IDXATTR"]])
134
135     def test_AddTwoIndexedAttributes(self):
136         # create names for an attribute to add
137         (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("schemaAttributes-Attr-")
138         ldif = self._make_attr_ldif(attr_name, attr_dn, 3,
139                                     "searchFlags: %d" % samba.dsdb.SEARCH_FLAG_ATTINDEX)
140
141         # add the new attribute
142         self.samdb.add_ldif(ldif)
143         self._ldap_schemaUpdateNow()
144
145         # create names for an attribute to add
146         (attr_name2, attr_ldap_name2, attr_dn2) = self._make_obj_names("schemaAttributes-Attr-")
147         ldif = self._make_attr_ldif(attr_name2, attr_dn2, 4,
148                                     "searchFlags: %d" % samba.dsdb.SEARCH_FLAG_ATTINDEX)
149
150         # add the new attribute
151         self.samdb.add_ldif(ldif)
152         self._ldap_schemaUpdateNow()
153
154         # Check @ATTRIBUTES
155
156         attr_res = self.samdb.search(base="@ATTRIBUTES", scope=ldb.SCOPE_BASE)
157
158         self.assertIn(attr_ldap_name, attr_res[0])
159         self.assertEqual(len(attr_res[0][attr_ldap_name]), 1)
160         self.assertEqual(str(attr_res[0][attr_ldap_name][0]), "CASE_INSENSITIVE")
161
162         self.assertIn(attr_ldap_name2, attr_res[0])
163         self.assertEqual(len(attr_res[0][attr_ldap_name2]), 1)
164         self.assertEqual(str(attr_res[0][attr_ldap_name2][0]), "CASE_INSENSITIVE")
165
166         # Check @INDEXLIST
167
168         idx_res = self.samdb.search(base="@INDEXLIST", scope=ldb.SCOPE_BASE)
169
170         self.assertIn(attr_ldap_name, [str(x) for x in idx_res[0]["@IDXATTR"]])
171         self.assertIn(attr_ldap_name2, [str(x) for x in idx_res[0]["@IDXATTR"]])
172
173     def test_modify_at_attributes(self):
174         m = {"dn": "@ATTRIBUTES",
175              "@TEST_EXTRA": ["HIDDEN"]
176              }
177
178         msg = ldb.Message.from_dict(self.samdb, m, ldb.FLAG_MOD_ADD)
179         self.samdb.modify(msg)
180
181         res = self.samdb.search(base="@ATTRIBUTES", scope=ldb.SCOPE_BASE,
182                                 attrs=["@TEST_EXTRA"])
183         self.assertEqual(len(res), 1)
184         self.assertEqual(str(res[0].dn), "@ATTRIBUTES")
185         self.assertEqual(len(res[0]), 1)
186         self.assertTrue("@TEST_EXTRA" in res[0])
187         self.assertEqual(len(res[0]["@TEST_EXTRA"]), 1)
188         self.assertEqual(str(res[0]["@TEST_EXTRA"][0]), "HIDDEN")
189
190         samdb2 = samba.tests.connect_samdb(self.lp.samdb_url())
191
192         # We now only update the @ATTRIBUTES when a transaction happens
193         # rather than making a read of the DB do writes.
194         #
195         # This avoids locking issues and is more expected
196
197         samdb2.transaction_start()
198         samdb2.transaction_commit()
199
200         res = self.samdb.search(base="@ATTRIBUTES", scope=ldb.SCOPE_BASE,
201                                 attrs=["@TEST_EXTRA"])
202         self.assertEqual(len(res), 1)
203         self.assertEqual(str(res[0].dn), "@ATTRIBUTES")
204         self.assertEqual(len(res[0]), 0)
205         self.assertFalse("@TEST_EXTRA" in res[0])
206
207     def test_modify_at_indexlist(self):
208         m = {"dn": "@INDEXLIST",
209              "@TEST_EXTRA": ["1"]
210              }
211
212         msg = ldb.Message.from_dict(self.samdb, m, ldb.FLAG_MOD_ADD)
213         self.samdb.modify(msg)
214
215         res = self.samdb.search(base="@INDEXLIST", scope=ldb.SCOPE_BASE,
216                                 attrs=["@TEST_EXTRA"])
217         self.assertEqual(len(res), 1)
218         self.assertEqual(str(res[0].dn), "@INDEXLIST")
219         self.assertEqual(len(res[0]), 1)
220         self.assertTrue("@TEST_EXTRA" in res[0])
221         self.assertEqual(len(res[0]["@TEST_EXTRA"]), 1)
222         self.assertEqual(str(res[0]["@TEST_EXTRA"][0]), "1")
223
224         samdb2 = samba.tests.connect_samdb(self.lp.samdb_url())
225
226         # We now only update the @INDEXLIST when a transaction happens
227         # rather than making a read of the DB do writes.
228         #
229         # This avoids locking issues and is more expected
230
231         samdb2.transaction_start()
232         samdb2.transaction_commit()
233
234         res = self.samdb.search(base="@INDEXLIST", scope=ldb.SCOPE_BASE,
235                                 attrs=["@TEST_EXTRA"])
236         self.assertEqual(len(res), 1)
237         self.assertEqual(str(res[0].dn), "@INDEXLIST")
238         self.assertEqual(len(res[0]), 0)
239         self.assertFalse("@TEST_EXTRA" in res[0])
240
241     def test_modify_fail_of_at_indexlist(self):
242         m = {"dn": "@INDEXLIST",
243              "@TEST_NOT_EXTRA": ["1"]
244              }
245
246         msg = ldb.Message.from_dict(self.samdb, m, ldb.FLAG_MOD_DELETE)
247         try:
248             self.samdb.modify(msg)
249             self.fail("modify of @INDEXLIST with a failed constraint should fail")
250         except LdbError as err:
251             enum = err.args[0]
252             self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)