2 # -*- coding: utf-8 -*-
3 # This is a port of the original in testprogs/ejs/ldap.js
13 sys.path.append("bin/python")
14 sys.path.append("../lib/subunit/python")
16 import samba.getopt as options
18 from samba.auth import system_session
19 from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
20 from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS
21 from ldb import ERR_ENTRY_ALREADY_EXISTS, ERR_UNWILLING_TO_PERFORM
22 from ldb import ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_OTHER, ERR_INVALID_DN_SYNTAX
23 from ldb import ERR_NO_SUCH_ATTRIBUTE, ERR_INSUFFICIENT_ACCESS_RIGHTS
24 from ldb import ERR_OBJECT_CLASS_VIOLATION, ERR_NOT_ALLOWED_ON_RDN
25 from ldb import ERR_NAMING_VIOLATION, ERR_CONSTRAINT_VIOLATION
26 from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE
27 from ldb import Message, MessageElement, Dn
28 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
30 from samba import UF_NORMAL_ACCOUNT, UF_TEMP_DUPLICATE_ACCOUNT
31 from samba import UF_SERVER_TRUST_ACCOUNT, UF_WORKSTATION_TRUST_ACCOUNT
32 from samba import UF_INTERDOMAIN_TRUST_ACCOUNT
33 from samba import UF_PASSWD_NOTREQD, UF_ACCOUNTDISABLE
34 from samba import GTYPE_SECURITY_BUILTIN_LOCAL_GROUP
35 from samba import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
36 from samba import GTYPE_SECURITY_UNIVERSAL_GROUP
37 from samba import GTYPE_DISTRIBUTION_GLOBAL_GROUP
38 from samba import GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
39 from samba import GTYPE_DISTRIBUTION_UNIVERSAL_GROUP
40 from samba import ATYPE_NORMAL_ACCOUNT, ATYPE_WORKSTATION_TRUST
41 from samba import ATYPE_SECURITY_GLOBAL_GROUP, ATYPE_SECURITY_LOCAL_GROUP
42 from samba import ATYPE_SECURITY_UNIVERSAL_GROUP
43 from samba import ATYPE_DISTRIBUTION_GLOBAL_GROUP
44 from samba import ATYPE_DISTRIBUTION_LOCAL_GROUP
45 from samba import ATYPE_DISTRIBUTION_UNIVERSAL_GROUP
46 from samba import DS_DC_FUNCTION_2003
48 from subunit import SubunitTestRunner
51 from samba.ndr import ndr_pack, ndr_unpack
52 from samba.dcerpc import security
54 parser = optparse.OptionParser("ldap [options] <host>")
55 sambaopts = options.SambaOptions(parser)
56 parser.add_option_group(sambaopts)
57 parser.add_option_group(options.VersionOptions(parser))
58 # use command line creds if available
59 credopts = options.CredentialsOptions(parser)
60 parser.add_option_group(credopts)
61 opts, args = parser.parse_args()
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
73 class SchemaTests(unittest.TestCase):
74 def delete_force(self, ldb, dn):
77 except LdbError, (num, _):
78 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
80 def find_schemadn(self, ldb):
81 res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["schemaNamingContext"])
82 self.assertEquals(len(res), 1)
83 return res[0]["schemaNamingContext"][0]
85 def find_basedn(self, ldb):
86 res = ldb.search(base="", expression="", scope=SCOPE_BASE,
87 attrs=["defaultNamingContext"])
88 self.assertEquals(len(res), 1)
89 return res[0]["defaultNamingContext"][0]
93 self.schema_dn = self.find_schemadn(ldb)
94 self.base_dn = self.find_basedn(ldb)
96 def test_generated_schema(self):
97 """Testing we can read the generated schema via LDAP"""
98 res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
99 attrs=["objectClasses", "attributeTypes", "dITContentRules"])
100 self.assertEquals(len(res), 1)
101 self.assertTrue("dITContentRules" in res[0])
102 self.assertTrue("objectClasses" in res[0])
103 self.assertTrue("attributeTypes" in res[0])
105 def test_generated_schema_is_operational(self):
106 """Testing we don't get the generated schema via LDAP by default"""
107 res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
109 self.assertEquals(len(res), 1)
110 self.assertFalse("dITContentRules" in res[0])
111 self.assertFalse("objectClasses" in res[0])
112 self.assertFalse("attributeTypes" in res[0])
114 def test_schemaUpdateNow(self):
115 """Testing schemaUpdateNow"""
116 attr_name = "test-Attr" + time.strftime("%s", time.gmtime())
117 attr_ldap_display_name = attr_name.replace("-", "")
120 dn: CN=%s,%s""" % (attr_name, self.schema_dn) + """
122 objectClass: attributeSchema
123 adminDescription: """ + attr_name + """
124 adminDisplayName: """ + attr_name + """
125 cn: """ + attr_name + """
126 attributeId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9940
127 attributeSyntax: 2.5.5.12
133 self.ldb.add_ldif(ldif)
135 # Search for created attribute
137 res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
138 self.assertEquals(len(res), 1)
139 self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_display_name)
140 self.assertTrue("schemaIDGUID" in res[0])
142 class_name = "test-Class" + time.strftime("%s", time.gmtime())
143 class_ldap_display_name = class_name.replace("-", "")
145 # First try to create a class with a wrong "defaultObjectCategory"
147 dn: CN=%s,%s""" % (class_name, self.schema_dn) + """
149 objectClass: classSchema
150 defaultObjectCategory: CN=_
151 adminDescription: """ + class_name + """
152 adminDisplayName: """ + class_name + """
153 cn: """ + class_name + """
154 governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
156 objectClassCategory: 1
157 subClassOf: organizationalPerson
160 systemMustContain: cn
161 systemMustContain: """ + attr_ldap_display_name + """
165 self.ldb.add_ldif(ldif)
167 except LdbError, (num, _):
168 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
171 dn: CN=%s,%s""" % (class_name, self.schema_dn) + """
173 objectClass: classSchema
174 adminDescription: """ + class_name + """
175 adminDisplayName: """ + class_name + """
176 cn: """ + class_name + """
177 governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
179 objectClassCategory: 1
180 subClassOf: organizationalPerson
183 systemMustContain: cn
184 systemMustContain: """ + attr_ldap_display_name + """
187 self.ldb.add_ldif(ldif)
189 # Search for created objectclass
191 res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
192 self.assertEquals(len(res), 1)
193 self.assertEquals(res[0]["lDAPDisplayName"][0], class_ldap_display_name)
194 self.assertEquals(res[0]["defaultObjectCategory"][0], res[0]["distinguishedName"][0])
195 self.assertTrue("schemaIDGUID" in res[0])
203 self.ldb.modify_ldif(ldif)
205 object_name = "obj" + time.strftime("%s", time.gmtime())
208 dn: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
209 objectClass: organizationalPerson
211 objectClass: """ + class_ldap_display_name + """
213 cn: """ + object_name + """
215 objectCategory: CN=%s,%s"""% (class_name, self.schema_dn) + """
216 distinguishedName: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
217 name: """ + object_name + """
218 """ + attr_ldap_display_name + """: test
220 self.ldb.add_ldif(ldif)
222 # Search for created object
224 res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["*"])
225 self.assertEquals(len(res), 1)
227 self.delete_force(self.ldb, "cn=%s,cn=Users,%s" % (object_name, self.base_dn))
230 class SchemaTests_msDS_IntId(unittest.TestCase):
234 res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
235 self.assertEquals(len(res), 1)
236 self.schema_dn = res[0]["schemaNamingContext"][0]
237 self.base_dn = res[0]["defaultNamingContext"][0]
238 self.forest_level = int(res[0]["forestFunctionality"][0])
240 def _ldap_schemaUpdateNow(self):
247 self.ldb.modify_ldif(ldif)
249 def _make_obj_names(self, prefix):
250 class_name = prefix + time.strftime("%s", time.gmtime())
251 class_ldap_name = class_name.replace("-", "")
252 class_dn = "CN=%s,%s" % (class_name, self.schema_dn)
253 return (class_name, class_ldap_name, class_dn)
255 def _is_schema_base_object(self, ldb_msg):
256 """Test systemFlags for SYSTEM_FLAG_SCHEMA_BASE_OBJECT (16)"""
258 if "systemFlags" in ldb_msg:
259 systemFlags = int(ldb_msg["systemFlags"][0])
260 return (systemFlags & 16) != 0
262 def _make_attr_ldif(self, attr_name, attr_dn):
264 dn: """ + attr_dn + """
266 objectClass: attributeSchema
267 adminDescription: """ + attr_name + """
268 adminDisplayName: """ + attr_name + """
269 cn: """ + attr_name + """
270 attributeId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9940
271 attributeSyntax: 2.5.5.12
279 def test_msDS_IntId_on_attr(self):
280 """Testing msDs-IntId creation for Attributes.
281 See MS-ADTS - 3.1.1.Attributes
283 This test should verify that:
284 - Creating attribute with 'msDS-IntId' fails with ERR_UNWILLING_TO_PERFORM
285 - Adding 'msDS-IntId' on existing attribute fails with ERR_CONSTRAINT_VIOLATION
286 - Creating attribute with 'msDS-IntId' set and FLAG_SCHEMA_BASE_OBJECT flag
287 set fails with ERR_UNWILLING_TO_PERFORM
288 - Attributes created with FLAG_SCHEMA_BASE_OBJECT not set have
289 'msDS-IntId' attribute added internally
292 # 1. Create attribute without systemFlags
293 # msDS-IntId should be created if forest functional
294 # level is >= DS_DC_FUNCTION_2003
295 # and missing otherwise
296 (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("msDS-IntId-Attr-1-")
297 ldif = self._make_attr_ldif(attr_name, attr_dn)
299 # try to add msDS-IntId during Attribute creation
300 ldif_fail = ldif + "msDS-IntId: -1993108831\n"
302 self.ldb.add_ldif(ldif_fail)
303 self.fail("Adding attribute with preset msDS-IntId should fail")
304 except LdbError, (num, _):
305 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
307 # add the new attribute and update schema
308 self.ldb.add_ldif(ldif)
309 self._ldap_schemaUpdateNow()
311 # Search for created attribute
313 res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"])
314 self.assertEquals(len(res), 1)
315 self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name)
316 if self.forest_level >= DS_DC_FUNCTION_2003:
317 if self._is_schema_base_object(res[0]):
318 self.assertTrue("msDS-IntId" not in res[0])
320 self.assertTrue("msDS-IntId" in res[0])
322 self.assertTrue("msDS-IntId" not in res[0])
325 msg.dn = Dn(self.ldb, attr_dn)
326 msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
329 self.fail("Modifying msDS-IntId should return error")
330 except LdbError, (num, _):
331 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
333 # 2. Create attribute with systemFlags = FLAG_SCHEMA_BASE_OBJECT
334 # msDS-IntId should be created if forest functional
335 # level is >= DS_DC_FUNCTION_2003
336 # and missing otherwise
337 (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("msDS-IntId-Attr-2-")
338 ldif = self._make_attr_ldif(attr_name, attr_dn)
339 ldif += "systemFlags: 16\n"
341 # try to add msDS-IntId during Attribute creation
342 ldif_fail = ldif + "msDS-IntId: -1993108831\n"
344 self.ldb.add_ldif(ldif_fail)
345 self.fail("Adding attribute with preset msDS-IntId should fail")
346 except LdbError, (num, _):
347 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
349 # add the new attribute and update schema
350 self.ldb.add_ldif(ldif)
351 self._ldap_schemaUpdateNow()
353 # Search for created attribute
355 res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"])
356 self.assertEquals(len(res), 1)
357 self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name)
358 if self.forest_level >= DS_DC_FUNCTION_2003:
359 if self._is_schema_base_object(res[0]):
360 self.assertTrue("msDS-IntId" not in res[0])
362 self.assertTrue("msDS-IntId" in res[0])
364 self.assertTrue("msDS-IntId" not in res[0])
367 msg.dn = Dn(self.ldb, attr_dn)
368 msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
371 self.fail("Modifying msDS-IntId should return error")
372 except LdbError, (num, _):
373 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
376 def _make_class_ldif(self, class_dn, class_name):
378 dn: """ + class_dn + """
380 objectClass: classSchema
381 adminDescription: """ + class_name + """
382 adminDisplayName: """ + class_name + """
383 cn: """ + class_name + """
384 governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
386 objectClassCategory: 1
387 subClassOf: organizationalPerson
389 systemMustContain: cn
394 def test_msDS_IntId_on_class(self):
395 """Testing msDs-IntId creation for Class
396 Reference: MS-ADTS - 3.1.1.2.4.8 Class classSchema"""
398 # 1. Create Class without systemFlags
399 # msDS-IntId should be created if forest functional
400 # level is >= DS_DC_FUNCTION_2003
401 # and missing otherwise
402 (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-1-")
403 ldif = self._make_class_ldif(class_dn, class_name)
405 # try to add msDS-IntId during Class creation
406 ldif_add = ldif + "msDS-IntId: -1993108831\n"
407 self.ldb.add_ldif(ldif_add)
408 self._ldap_schemaUpdateNow()
410 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
411 self.assertEquals(len(res), 1)
412 self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831")
414 # add a new Class and update schema
415 (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-2-")
416 ldif = self._make_class_ldif(class_dn, class_name)
418 self.ldb.add_ldif(ldif)
419 self._ldap_schemaUpdateNow()
421 # Search for created Class
422 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
423 self.assertEquals(len(res), 1)
424 self.assertFalse("msDS-IntId" in res[0])
427 msg.dn = Dn(self.ldb, class_dn)
428 msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
431 self.fail("Modifying msDS-IntId should return error")
432 except LdbError, (num, _):
433 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
435 # 2. Create Class with systemFlags = FLAG_SCHEMA_BASE_OBJECT
436 # msDS-IntId should be created if forest functional
437 # level is >= DS_DC_FUNCTION_2003
438 # and missing otherwise
439 (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-3-")
440 ldif = self._make_class_ldif(class_dn, class_name)
441 ldif += "systemFlags: 16\n"
443 # try to add msDS-IntId during Class creation
444 ldif_add = ldif + "msDS-IntId: -1993108831\n"
445 self.ldb.add_ldif(ldif_add)
447 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
448 self.assertEquals(len(res), 1)
449 self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831")
451 # add the new Class and update schema
452 (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-4-")
453 ldif = self._make_class_ldif(class_dn, class_name)
454 ldif += "systemFlags: 16\n"
456 self.ldb.add_ldif(ldif)
457 self._ldap_schemaUpdateNow()
459 # Search for created Class
460 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
461 self.assertEquals(len(res), 1)
462 self.assertFalse("msDS-IntId" in res[0])
465 msg.dn = Dn(self.ldb, class_dn)
466 msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
469 self.fail("Modifying msDS-IntId should return error")
470 except LdbError, (num, _):
471 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
472 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
473 self.assertEquals(len(res), 1)
474 self.assertFalse("msDS-IntId" in res[0])
477 def test_verify_msDS_IntId(self):
478 """Verify msDS-IntId exists only on attributes without FLAG_SCHEMA_BASE_OBJECT flag set"""
480 res = self.ldb.search(self.schema_dn, scope=SCOPE_ONELEVEL,
481 expression="objectClass=attributeSchema",
482 attrs=["systemFlags", "msDS-IntId", "attributeID", "cn"])
483 self.assertTrue(len(res) > 1)
485 if self.forest_level >= DS_DC_FUNCTION_2003:
486 if self._is_schema_base_object(ldb_msg):
487 self.assertTrue("msDS-IntId" not in ldb_msg)
489 # don't assert here as there are plenty of
490 # attributes under w2k8 that are not part of
491 # Base Schema (SYSTEM_FLAG_SCHEMA_BASE_OBJECT flag not set)
492 # has not msDS-IntId attribute set
493 #self.assertTrue("msDS-IntId" in ldb_msg, "msDS-IntId expected on: %s" % ldb_msg.dn)
494 if "msDS-IntId" not in ldb_msg:
496 print "%3d warning: msDS-IntId expected on: %-30s %s" % (count, ldb_msg["attributeID"], ldb_msg["cn"])
498 self.assertTrue("msDS-IntId" not in ldb_msg)
501 if not "://" in host:
502 if os.path.isfile(host):
503 host = "tdb://%s" % host
505 host = "ldap://%s" % host
508 if host.startswith("ldap://"):
509 # user 'paged_search' module when connecting remotely
510 ldb_options = ["modules:paged_searches"]
512 ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp, options=ldb_options)
513 if not "tdb://" in host:
514 gc_ldb = Ldb("%s:3268" % host, credentials=creds,
515 session_info=system_session(), lp=lp)
519 runner = SubunitTestRunner()
521 if not runner.run(unittest.makeSuite(SchemaTests)).wasSuccessful():
523 if not runner.run(unittest.makeSuite(SchemaTests_msDS_IntId)).wasSuccessful():