s4:ldap_schema.py - Move generated attributes check
[ira/wip.git] / source4 / lib / ldb / tests / python / ldap_schema.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 # This is a port of the original in testprogs/ejs/ldap.js
4
5 import getopt
6 import optparse
7 import sys
8 import time
9 import random
10 import base64
11 import os
12
13 sys.path.append("bin/python")
14 sys.path.append("../lib/subunit/python")
15
16 import samba.getopt as options
17
18 from samba.auth import system_session
19 from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
20 from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS
21 from ldb import ERR_ENTRY_ALREADY_EXISTS, ERR_UNWILLING_TO_PERFORM
22 from ldb import ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_OTHER, ERR_INVALID_DN_SYNTAX
23 from ldb import ERR_NO_SUCH_ATTRIBUTE, ERR_INSUFFICIENT_ACCESS_RIGHTS
24 from ldb import ERR_OBJECT_CLASS_VIOLATION, ERR_NOT_ALLOWED_ON_RDN
25 from ldb import ERR_NAMING_VIOLATION, ERR_CONSTRAINT_VIOLATION
26 from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE
27 from ldb import Message, MessageElement, Dn
28 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
29 from samba import Ldb
30 from samba import UF_NORMAL_ACCOUNT, UF_TEMP_DUPLICATE_ACCOUNT
31 from samba import UF_SERVER_TRUST_ACCOUNT, UF_WORKSTATION_TRUST_ACCOUNT
32 from samba import UF_INTERDOMAIN_TRUST_ACCOUNT
33 from samba import UF_PASSWD_NOTREQD, UF_ACCOUNTDISABLE
34 from samba import GTYPE_SECURITY_BUILTIN_LOCAL_GROUP
35 from samba import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
36 from samba import GTYPE_SECURITY_UNIVERSAL_GROUP
37 from samba import GTYPE_DISTRIBUTION_GLOBAL_GROUP
38 from samba import GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
39 from samba import GTYPE_DISTRIBUTION_UNIVERSAL_GROUP
40 from samba import ATYPE_NORMAL_ACCOUNT, ATYPE_WORKSTATION_TRUST
41 from samba import ATYPE_SECURITY_GLOBAL_GROUP, ATYPE_SECURITY_LOCAL_GROUP
42 from samba import ATYPE_SECURITY_UNIVERSAL_GROUP
43 from samba import ATYPE_DISTRIBUTION_GLOBAL_GROUP
44 from samba import ATYPE_DISTRIBUTION_LOCAL_GROUP
45 from samba import ATYPE_DISTRIBUTION_UNIVERSAL_GROUP
46 from samba import DS_DC_FUNCTION_2003
47
48 from subunit import SubunitTestRunner
49 import unittest
50
51 from samba.ndr import ndr_pack, ndr_unpack
52 from samba.dcerpc import security
53
54 parser = optparse.OptionParser("ldap [options] <host>")
55 sambaopts = options.SambaOptions(parser)
56 parser.add_option_group(sambaopts)
57 parser.add_option_group(options.VersionOptions(parser))
58 # use command line creds if available
59 credopts = options.CredentialsOptions(parser)
60 parser.add_option_group(credopts)
61 opts, args = parser.parse_args()
62
63 if len(args) < 1:
64     parser.print_usage()
65     sys.exit(1)
66
67 host = args[0]
68
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
71
72
73 class SchemaTests(unittest.TestCase):
74     def delete_force(self, ldb, dn):
75         try:
76             ldb.delete(dn)
77         except LdbError, (num, _):
78             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
79
80     def find_schemadn(self, ldb):
81         res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["schemaNamingContext"])
82         self.assertEquals(len(res), 1)
83         return res[0]["schemaNamingContext"][0]
84
85     def find_basedn(self, ldb):
86         res = ldb.search(base="", expression="", scope=SCOPE_BASE,
87                          attrs=["defaultNamingContext"])
88         self.assertEquals(len(res), 1)
89         return res[0]["defaultNamingContext"][0]
90
91     def setUp(self):
92         self.ldb = ldb
93         self.schema_dn = self.find_schemadn(ldb)
94         self.base_dn = self.find_basedn(ldb)
95
96     def test_generated_schema(self):
97         """Testing we can read the generated schema via LDAP"""
98         res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
99                 attrs=["objectClasses", "attributeTypes", "dITContentRules"])
100         self.assertEquals(len(res), 1)
101         self.assertTrue("dITContentRules" in res[0])
102         self.assertTrue("objectClasses" in res[0])
103         self.assertTrue("attributeTypes" in res[0])
104
105     def test_generated_schema_is_operational(self):
106         """Testing we don't get the generated schema via LDAP by default"""
107         res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
108                 attrs=["*"])
109         self.assertEquals(len(res), 1)
110         self.assertFalse("dITContentRules" in res[0])
111         self.assertFalse("objectClasses" in res[0])
112         self.assertFalse("attributeTypes" in res[0])
113
114     def test_schemaUpdateNow(self):
115         """Testing schemaUpdateNow"""
116         attr_name = "test-Attr" + time.strftime("%s", time.gmtime())
117         attr_ldap_display_name = attr_name.replace("-", "")
118
119         ldif = """
120 dn: CN=%s,%s""" % (attr_name, self.schema_dn) + """
121 objectClass: top
122 objectClass: attributeSchema
123 adminDescription: """ + attr_name + """
124 adminDisplayName: """ + attr_name + """
125 cn: """ + attr_name + """
126 attributeId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9940
127 attributeSyntax: 2.5.5.12
128 omSyntax: 64
129 instanceType: 4
130 isSingleValued: TRUE
131 systemOnly: FALSE
132 """
133         self.ldb.add_ldif(ldif)
134
135         # Search for created attribute
136         res = []
137         res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
138         self.assertEquals(len(res), 1)
139         self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_display_name)
140         self.assertTrue("schemaIDGUID" in res[0])
141
142         class_name = "test-Class" + time.strftime("%s", time.gmtime())
143         class_ldap_display_name = class_name.replace("-", "")
144
145         ldif = """
146 dn: CN=%s,%s""" % (class_name, self.schema_dn) + """
147 objectClass: top
148 objectClass: classSchema
149 adminDescription: """ + class_name + """
150 adminDisplayName: """ + class_name + """
151 cn: """ + class_name + """
152 governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
153 instanceType: 4
154 objectClassCategory: 1
155 subClassOf: organizationalPerson
156 systemFlags: 16
157 rDNAttID: cn
158 systemMustContain: cn
159 systemMustContain: """ + attr_ldap_display_name + """
160 systemOnly: FALSE
161 """
162         self.ldb.add_ldif(ldif)
163
164         # Search for created objectclass
165         res = []
166         res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
167         self.assertEquals(len(res), 1)
168         self.assertEquals(res[0]["lDAPDisplayName"][0], class_ldap_display_name)
169         self.assertEquals(res[0]["defaultObjectCategory"][0], res[0]["distinguishedName"][0])
170         self.assertTrue("schemaIDGUID" in res[0])
171
172         ldif = """
173 dn:
174 changetype: modify
175 add: schemaUpdateNow
176 schemaUpdateNow: 1
177 """
178         self.ldb.modify_ldif(ldif)
179
180         object_name = "obj" + time.strftime("%s", time.gmtime())
181
182         ldif = """
183 dn: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
184 objectClass: organizationalPerson
185 objectClass: person
186 objectClass: """ + class_ldap_display_name + """
187 objectClass: top
188 cn: """ + object_name + """
189 instanceType: 4
190 objectCategory: CN=%s,%s"""% (class_name, self.schema_dn) + """
191 distinguishedName: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
192 name: """ + object_name + """
193 """ + attr_ldap_display_name + """: test
194 """
195         self.ldb.add_ldif(ldif)
196
197         # Search for created object
198         res = []
199         res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["*"])
200         self.assertEquals(len(res), 1)
201         # Delete the object
202         self.delete_force(self.ldb, "cn=%s,cn=Users,%s" % (object_name, self.base_dn))
203
204
205 class SchemaTests_msDS_IntId(unittest.TestCase):
206
207     def setUp(self):
208         self.ldb = ldb
209         res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
210         self.assertEquals(len(res), 1)
211         self.schema_dn = res[0]["schemaNamingContext"][0]
212         self.base_dn = res[0]["defaultNamingContext"][0]
213         self.forest_level = int(res[0]["forestFunctionality"][0])
214
215     def _ldap_schemaUpdateNow(self):
216         ldif = """
217 dn:
218 changetype: modify
219 add: schemaUpdateNow
220 schemaUpdateNow: 1
221 """
222         self.ldb.modify_ldif(ldif)
223
224     def _make_obj_names(self, prefix):
225         class_name = prefix + time.strftime("%s", time.gmtime())
226         class_ldap_name = class_name.replace("-", "")
227         class_dn = "CN=%s,%s" % (class_name, self.schema_dn)
228         return (class_name, class_ldap_name, class_dn)
229
230     def _is_schema_base_object(self, ldb_msg):
231         """Test systemFlags for SYSTEM_FLAG_SCHEMA_BASE_OBJECT (16)"""
232         systemFlags = 0
233         if "systemFlags" in ldb_msg:
234             systemFlags = int(ldb_msg["systemFlags"][0])
235         return (systemFlags & 16) != 0
236
237     def _make_attr_ldif(self, attr_name, attr_dn):
238         ldif = """
239 dn: """ + attr_dn + """
240 objectClass: top
241 objectClass: attributeSchema
242 adminDescription: """ + attr_name + """
243 adminDisplayName: """ + attr_name + """
244 cn: """ + attr_name + """
245 attributeId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9940
246 attributeSyntax: 2.5.5.12
247 omSyntax: 64
248 instanceType: 4
249 isSingleValued: TRUE
250 systemOnly: FALSE
251 """
252         return ldif
253
254     def test_msDS_IntId_on_attr(self):
255         """Testing msDs-IntId creation for Attributes.
256         See MS-ADTS - 3.1.1.Attributes
257
258         This test should verify that:
259         - Creating attribute with 'msDS-IntId' fails with ERR_UNWILLING_TO_PERFORM
260         - Adding 'msDS-IntId' on existing attribute fails with ERR_CONSTRAINT_VIOLATION
261         - Creating attribute with 'msDS-IntId' set and FLAG_SCHEMA_BASE_OBJECT flag
262           set fails with ERR_UNWILLING_TO_PERFORM
263         - Attributes created with FLAG_SCHEMA_BASE_OBJECT not set have
264           'msDS-IntId' attribute added internally
265         """
266
267         # 1. Create attribute without systemFlags
268         # msDS-IntId should be created if forest functional
269         # level is >= DS_DC_FUNCTION_2003
270         # and missing otherwise
271         (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("msDS-IntId-Attr-1-")
272         ldif = self._make_attr_ldif(attr_name, attr_dn)
273
274         # try to add msDS-IntId during Attribute creation
275         ldif_fail = ldif + "msDS-IntId: -1993108831\n"
276         try:
277             self.ldb.add_ldif(ldif_fail)
278             self.fail("Adding attribute with preset msDS-IntId should fail")
279         except LdbError, (num, _):
280             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
281
282         # add the new attribute and update schema
283         self.ldb.add_ldif(ldif)
284         self._ldap_schemaUpdateNow()
285
286         # Search for created attribute
287         res = []
288         res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"])
289         self.assertEquals(len(res), 1)
290         self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name)
291         if self.forest_level >= DS_DC_FUNCTION_2003:
292             if self._is_schema_base_object(res[0]):
293                 self.assertTrue("msDS-IntId" not in res[0])
294             else:
295                 self.assertTrue("msDS-IntId" in res[0])
296         else:
297             self.assertTrue("msDS-IntId" not in res[0])
298
299         msg = Message()
300         msg.dn = Dn(self.ldb, attr_dn)
301         msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
302         try:
303             self.ldb.modify(msg)
304             self.fail("Modifying msDS-IntId should return error")
305         except LdbError, (num, _):
306             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
307
308         # 2. Create attribute with systemFlags = FLAG_SCHEMA_BASE_OBJECT
309         # msDS-IntId should be created if forest functional
310         # level is >= DS_DC_FUNCTION_2003
311         # and missing otherwise
312         (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("msDS-IntId-Attr-2-")
313         ldif = self._make_attr_ldif(attr_name, attr_dn)
314         ldif += "systemFlags: 16\n"
315
316         # try to add msDS-IntId during Attribute creation
317         ldif_fail = ldif + "msDS-IntId: -1993108831\n"
318         try:
319             self.ldb.add_ldif(ldif_fail)
320             self.fail("Adding attribute with preset msDS-IntId should fail")
321         except LdbError, (num, _):
322             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
323
324         # add the new attribute and update schema
325         self.ldb.add_ldif(ldif)
326         self._ldap_schemaUpdateNow()
327
328         # Search for created attribute
329         res = []
330         res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"])
331         self.assertEquals(len(res), 1)
332         self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name)
333         if self.forest_level >= DS_DC_FUNCTION_2003:
334             if self._is_schema_base_object(res[0]):
335                 self.assertTrue("msDS-IntId" not in res[0])
336             else:
337                 self.assertTrue("msDS-IntId" in res[0])
338         else:
339             self.assertTrue("msDS-IntId" not in res[0])
340
341         msg = Message()
342         msg.dn = Dn(self.ldb, attr_dn)
343         msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
344         try:
345             self.ldb.modify(msg)
346             self.fail("Modifying msDS-IntId should return error")
347         except LdbError, (num, _):
348             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
349
350
351     def _make_class_ldif(self, class_dn, class_name):
352         ldif = """
353 dn: """ + class_dn + """
354 objectClass: top
355 objectClass: classSchema
356 adminDescription: """ + class_name + """
357 adminDisplayName: """ + class_name + """
358 cn: """ + class_name + """
359 governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
360 instanceType: 4
361 objectClassCategory: 1
362 subClassOf: organizationalPerson
363 rDNAttID: cn
364 systemMustContain: cn
365 systemOnly: FALSE
366 """
367         return ldif
368
369     def test_msDS_IntId_on_class(self):
370         """Testing msDs-IntId creation for Class
371            Reference: MS-ADTS - 3.1.1.2.4.8 Class classSchema"""
372
373         # 1. Create Class without systemFlags
374         # msDS-IntId should be created if forest functional
375         # level is >= DS_DC_FUNCTION_2003
376         # and missing otherwise
377         (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-1-")
378         ldif = self._make_class_ldif(class_dn, class_name)
379
380         # try to add msDS-IntId during Class creation
381         ldif_add = ldif + "msDS-IntId: -1993108831\n"
382         self.ldb.add_ldif(ldif_add)
383         self._ldap_schemaUpdateNow()
384
385         res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
386         self.assertEquals(len(res), 1)
387         self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831")
388
389         # add a new Class and update schema
390         (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-2-")
391         ldif = self._make_class_ldif(class_dn, class_name)
392
393         self.ldb.add_ldif(ldif)
394         self._ldap_schemaUpdateNow()
395
396         # Search for created Class
397         res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
398         self.assertEquals(len(res), 1)
399         self.assertFalse("msDS-IntId" in res[0])
400
401         msg = Message()
402         msg.dn = Dn(self.ldb, class_dn)
403         msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
404         try:
405             self.ldb.modify(msg)
406             self.fail("Modifying msDS-IntId should return error")
407         except LdbError, (num, _):
408             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
409
410         # 2. Create Class with systemFlags = FLAG_SCHEMA_BASE_OBJECT
411         # msDS-IntId should be created if forest functional
412         # level is >= DS_DC_FUNCTION_2003
413         # and missing otherwise
414         (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-3-")
415         ldif = self._make_class_ldif(class_dn, class_name)
416         ldif += "systemFlags: 16\n"
417
418         # try to add msDS-IntId during Class creation
419         ldif_add = ldif + "msDS-IntId: -1993108831\n"
420         self.ldb.add_ldif(ldif_add)
421
422         res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
423         self.assertEquals(len(res), 1)
424         self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831")
425
426         # add the new Class and update schema
427         (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-4-")
428         ldif = self._make_class_ldif(class_dn, class_name)
429         ldif += "systemFlags: 16\n"
430
431         self.ldb.add_ldif(ldif)
432         self._ldap_schemaUpdateNow()
433
434         # Search for created Class
435         res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
436         self.assertEquals(len(res), 1)
437         self.assertFalse("msDS-IntId" in res[0])
438
439         msg = Message()
440         msg.dn = Dn(self.ldb, class_dn)
441         msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
442         try:
443             self.ldb.modify(msg)
444             self.fail("Modifying msDS-IntId should return error")
445         except LdbError, (num, _):
446             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
447         res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
448         self.assertEquals(len(res), 1)
449         self.assertFalse("msDS-IntId" in res[0])
450
451
452     def test_verify_msDS_IntId(self):
453         """Verify msDS-IntId exists only on attributes without FLAG_SCHEMA_BASE_OBJECT flag set"""
454         count = 0
455         res = self.ldb.search(self.schema_dn, scope=SCOPE_ONELEVEL,
456                               expression="objectClass=attributeSchema",
457                               attrs=["systemFlags", "msDS-IntId", "attributeID", "cn"])
458         self.assertTrue(len(res) > 1)
459         for ldb_msg in res:
460             if self.forest_level >= DS_DC_FUNCTION_2003:
461                 if self._is_schema_base_object(ldb_msg):
462                     self.assertTrue("msDS-IntId" not in ldb_msg)
463                 else:
464                     # don't assert here as there are plenty of
465                     # attributes under w2k8 that are not part of
466                     # Base Schema (SYSTEM_FLAG_SCHEMA_BASE_OBJECT flag not set)
467                     # has not msDS-IntId attribute set
468                     #self.assertTrue("msDS-IntId" in ldb_msg, "msDS-IntId expected on: %s" % ldb_msg.dn)
469                     if "msDS-IntId" not in ldb_msg:
470                         count = count + 1
471                         print "%3d warning: msDS-IntId expected on: %-30s %s" % (count, ldb_msg["attributeID"], ldb_msg["cn"])
472             else:
473                 self.assertTrue("msDS-IntId" not in ldb_msg)
474
475
476 if not "://" in host:
477     if os.path.isfile(host):
478         host = "tdb://%s" % host
479     else:
480         host = "ldap://%s" % host
481
482 ldb_options = []
483 if host.startswith("ldap://"):
484     # user 'paged_search' module when connecting remotely
485     ldb_options = ["modules:paged_searches"]
486
487 ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp, options=ldb_options)
488 if not "tdb://" in host:
489     gc_ldb = Ldb("%s:3268" % host, credentials=creds,
490                  session_info=system_session(), lp=lp)
491 else:
492     gc_ldb = None
493
494 runner = SubunitTestRunner()
495 rc = 0
496 if not runner.run(unittest.makeSuite(SchemaTests)).wasSuccessful():
497     rc = 1
498 if not runner.run(unittest.makeSuite(SchemaTests_msDS_IntId)).wasSuccessful():
499     rc = 1
500 sys.exit(rc)