2 # -*- coding: utf-8 -*-
3 # This is a port of the original in testprogs/ejs/ldap.js
13 sys.path.append("bin/python")
15 import samba.getopt as options
17 from samba.auth import system_session
18 from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
19 from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS
20 from ldb import ERR_ENTRY_ALREADY_EXISTS, ERR_UNWILLING_TO_PERFORM
21 from ldb import ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_OTHER, ERR_INVALID_DN_SYNTAX
22 from ldb import ERR_NO_SUCH_ATTRIBUTE, ERR_INSUFFICIENT_ACCESS_RIGHTS
23 from ldb import ERR_OBJECT_CLASS_VIOLATION, ERR_NOT_ALLOWED_ON_RDN
24 from ldb import ERR_NAMING_VIOLATION, ERR_CONSTRAINT_VIOLATION
25 from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE
26 from ldb import Message, MessageElement, Dn
27 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
29 from samba import UF_NORMAL_ACCOUNT, UF_TEMP_DUPLICATE_ACCOUNT
30 from samba import UF_SERVER_TRUST_ACCOUNT, UF_WORKSTATION_TRUST_ACCOUNT
31 from samba import UF_INTERDOMAIN_TRUST_ACCOUNT
32 from samba import UF_PASSWD_NOTREQD, UF_ACCOUNTDISABLE
33 from samba import GTYPE_SECURITY_BUILTIN_LOCAL_GROUP
34 from samba import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_DOMAIN_LOCAL_GROUP
35 from samba import GTYPE_SECURITY_UNIVERSAL_GROUP
36 from samba import GTYPE_DISTRIBUTION_GLOBAL_GROUP
37 from samba import GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP
38 from samba import GTYPE_DISTRIBUTION_UNIVERSAL_GROUP
39 from samba import ATYPE_NORMAL_ACCOUNT, ATYPE_WORKSTATION_TRUST
40 from samba import ATYPE_SECURITY_GLOBAL_GROUP, ATYPE_SECURITY_LOCAL_GROUP
41 from samba import ATYPE_SECURITY_UNIVERSAL_GROUP
42 from samba import ATYPE_DISTRIBUTION_GLOBAL_GROUP
43 from samba import ATYPE_DISTRIBUTION_LOCAL_GROUP
44 from samba import ATYPE_DISTRIBUTION_UNIVERSAL_GROUP
45 from samba import DS_DC_FUNCTION_2003
47 from subunit.run import SubunitTestRunner
50 from samba.ndr import ndr_pack, ndr_unpack
51 from samba.dcerpc import security
53 parser = optparse.OptionParser("ldap [options] <host>")
54 sambaopts = options.SambaOptions(parser)
55 parser.add_option_group(sambaopts)
56 parser.add_option_group(options.VersionOptions(parser))
57 # use command line creds if available
58 credopts = options.CredentialsOptions(parser)
59 parser.add_option_group(credopts)
60 opts, args = parser.parse_args()
68 lp = sambaopts.get_loadparm()
69 creds = credopts.get_credentials(lp)
72 class SchemaTests(unittest.TestCase):
73 def delete_force(self, ldb, dn):
76 except LdbError, (num, _):
77 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
79 def find_schemadn(self, ldb):
80 res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["schemaNamingContext"])
81 self.assertEquals(len(res), 1)
82 return res[0]["schemaNamingContext"][0]
84 def find_basedn(self, ldb):
85 res = ldb.search(base="", expression="", scope=SCOPE_BASE,
86 attrs=["defaultNamingContext"])
87 self.assertEquals(len(res), 1)
88 return res[0]["defaultNamingContext"][0]
92 self.schema_dn = self.find_schemadn(ldb)
93 self.base_dn = self.find_basedn(ldb)
95 def test_generated_schema(self):
96 """Testing we can read the generated schema via LDAP"""
97 res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
98 attrs=["objectClasses", "attributeTypes", "dITContentRules"])
99 self.assertEquals(len(res), 1)
100 self.assertTrue("dITContentRules" in res[0])
101 self.assertTrue("objectClasses" in res[0])
102 self.assertTrue("attributeTypes" in res[0])
104 def test_generated_schema_is_operational(self):
105 """Testing we don't get the generated schema via LDAP by default"""
106 res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
108 self.assertEquals(len(res), 1)
109 self.assertFalse("dITContentRules" in res[0])
110 self.assertFalse("objectClasses" in res[0])
111 self.assertFalse("attributeTypes" in res[0])
113 def test_schemaUpdateNow(self):
114 """Testing schemaUpdateNow"""
115 attr_name = "test-Attr" + time.strftime("%s", time.gmtime())
116 attr_ldap_display_name = attr_name.replace("-", "")
119 dn: CN=%s,%s""" % (attr_name, self.schema_dn) + """
121 objectClass: attributeSchema
122 adminDescription: """ + attr_name + """
123 adminDisplayName: """ + attr_name + """
124 cn: """ + attr_name + """
125 attributeId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9940
126 attributeSyntax: 2.5.5.12
132 self.ldb.add_ldif(ldif)
134 # Search for created attribute
136 res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
137 self.assertEquals(len(res), 1)
138 self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_display_name)
139 self.assertTrue("schemaIDGUID" in res[0])
141 # Samba requires a "schemaUpdateNow" here.
142 # TODO: remove this when Samba is fixed
149 self.ldb.modify_ldif(ldif)
151 class_name = "test-Class" + time.strftime("%s", time.gmtime())
152 class_ldap_display_name = class_name.replace("-", "")
154 # First try to create a class with a wrong "defaultObjectCategory"
156 dn: CN=%s,%s""" % (class_name, self.schema_dn) + """
158 objectClass: classSchema
159 defaultObjectCategory: CN=_
160 adminDescription: """ + class_name + """
161 adminDisplayName: """ + class_name + """
162 cn: """ + class_name + """
163 governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
165 objectClassCategory: 1
166 subClassOf: organizationalPerson
169 systemMustContain: cn
170 systemMustContain: """ + attr_ldap_display_name + """
174 self.ldb.add_ldif(ldif)
176 except LdbError, (num, _):
177 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
180 dn: CN=%s,%s""" % (class_name, self.schema_dn) + """
182 objectClass: classSchema
183 adminDescription: """ + class_name + """
184 adminDisplayName: """ + class_name + """
185 cn: """ + class_name + """
186 governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
188 objectClassCategory: 1
189 subClassOf: organizationalPerson
192 systemMustContain: cn
193 systemMustContain: """ + attr_ldap_display_name + """
196 self.ldb.add_ldif(ldif)
198 # Search for created objectclass
200 res = self.ldb.search("cn=%s,%s" % (class_name, self.schema_dn), scope=SCOPE_BASE, attrs=["*"])
201 self.assertEquals(len(res), 1)
202 self.assertEquals(res[0]["lDAPDisplayName"][0], class_ldap_display_name)
203 self.assertEquals(res[0]["defaultObjectCategory"][0], res[0]["distinguishedName"][0])
204 self.assertTrue("schemaIDGUID" in res[0])
212 self.ldb.modify_ldif(ldif)
214 object_name = "obj" + time.strftime("%s", time.gmtime())
217 dn: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
218 objectClass: organizationalPerson
220 objectClass: """ + class_ldap_display_name + """
222 cn: """ + object_name + """
224 objectCategory: CN=%s,%s"""% (class_name, self.schema_dn) + """
225 distinguishedName: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
226 name: """ + object_name + """
227 """ + attr_ldap_display_name + """: test
229 self.ldb.add_ldif(ldif)
231 # Search for created object
233 res = self.ldb.search("cn=%s,cn=Users,%s" % (object_name, self.base_dn), scope=SCOPE_BASE, attrs=["*"])
234 self.assertEquals(len(res), 1)
236 self.delete_force(self.ldb, "cn=%s,cn=Users,%s" % (object_name, self.base_dn))
239 class SchemaTests_msDS_IntId(unittest.TestCase):
243 res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
244 self.assertEquals(len(res), 1)
245 self.schema_dn = res[0]["schemaNamingContext"][0]
246 self.base_dn = res[0]["defaultNamingContext"][0]
247 self.forest_level = int(res[0]["forestFunctionality"][0])
249 def _ldap_schemaUpdateNow(self):
256 self.ldb.modify_ldif(ldif)
258 def _make_obj_names(self, prefix):
259 class_name = prefix + time.strftime("%s", time.gmtime())
260 class_ldap_name = class_name.replace("-", "")
261 class_dn = "CN=%s,%s" % (class_name, self.schema_dn)
262 return (class_name, class_ldap_name, class_dn)
264 def _is_schema_base_object(self, ldb_msg):
265 """Test systemFlags for SYSTEM_FLAG_SCHEMA_BASE_OBJECT (16)"""
267 if "systemFlags" in ldb_msg:
268 systemFlags = int(ldb_msg["systemFlags"][0])
269 return (systemFlags & 16) != 0
271 def _make_attr_ldif(self, attr_name, attr_dn):
273 dn: """ + attr_dn + """
275 objectClass: attributeSchema
276 adminDescription: """ + attr_name + """
277 adminDisplayName: """ + attr_name + """
278 cn: """ + attr_name + """
279 attributeId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9940
280 attributeSyntax: 2.5.5.12
288 def test_msDS_IntId_on_attr(self):
289 """Testing msDs-IntId creation for Attributes.
290 See MS-ADTS - 3.1.1.Attributes
292 This test should verify that:
293 - Creating attribute with 'msDS-IntId' fails with ERR_UNWILLING_TO_PERFORM
294 - Adding 'msDS-IntId' on existing attribute fails with ERR_CONSTRAINT_VIOLATION
295 - Creating attribute with 'msDS-IntId' set and FLAG_SCHEMA_BASE_OBJECT flag
296 set fails with ERR_UNWILLING_TO_PERFORM
297 - Attributes created with FLAG_SCHEMA_BASE_OBJECT not set have
298 'msDS-IntId' attribute added internally
301 # 1. Create attribute without systemFlags
302 # msDS-IntId should be created if forest functional
303 # level is >= DS_DC_FUNCTION_2003
304 # and missing otherwise
305 (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("msDS-IntId-Attr-1-")
306 ldif = self._make_attr_ldif(attr_name, attr_dn)
308 # try to add msDS-IntId during Attribute creation
309 ldif_fail = ldif + "msDS-IntId: -1993108831\n"
311 self.ldb.add_ldif(ldif_fail)
312 self.fail("Adding attribute with preset msDS-IntId should fail")
313 except LdbError, (num, _):
314 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
316 # add the new attribute and update schema
317 self.ldb.add_ldif(ldif)
318 self._ldap_schemaUpdateNow()
320 # Search for created attribute
322 res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"])
323 self.assertEquals(len(res), 1)
324 self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name)
325 if self.forest_level >= DS_DC_FUNCTION_2003:
326 if self._is_schema_base_object(res[0]):
327 self.assertTrue("msDS-IntId" not in res[0])
329 self.assertTrue("msDS-IntId" in res[0])
331 self.assertTrue("msDS-IntId" not in res[0])
334 msg.dn = Dn(self.ldb, attr_dn)
335 msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
338 self.fail("Modifying msDS-IntId should return error")
339 except LdbError, (num, _):
340 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
342 # 2. Create attribute with systemFlags = FLAG_SCHEMA_BASE_OBJECT
343 # msDS-IntId should be created if forest functional
344 # level is >= DS_DC_FUNCTION_2003
345 # and missing otherwise
346 (attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("msDS-IntId-Attr-2-")
347 ldif = self._make_attr_ldif(attr_name, attr_dn)
348 ldif += "systemFlags: 16\n"
350 # try to add msDS-IntId during Attribute creation
351 ldif_fail = ldif + "msDS-IntId: -1993108831\n"
353 self.ldb.add_ldif(ldif_fail)
354 self.fail("Adding attribute with preset msDS-IntId should fail")
355 except LdbError, (num, _):
356 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
358 # add the new attribute and update schema
359 self.ldb.add_ldif(ldif)
360 self._ldap_schemaUpdateNow()
362 # Search for created attribute
364 res = self.ldb.search(attr_dn, scope=SCOPE_BASE, attrs=["*"])
365 self.assertEquals(len(res), 1)
366 self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_name)
367 if self.forest_level >= DS_DC_FUNCTION_2003:
368 if self._is_schema_base_object(res[0]):
369 self.assertTrue("msDS-IntId" not in res[0])
371 self.assertTrue("msDS-IntId" in res[0])
373 self.assertTrue("msDS-IntId" not in res[0])
376 msg.dn = Dn(self.ldb, attr_dn)
377 msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
380 self.fail("Modifying msDS-IntId should return error")
381 except LdbError, (num, _):
382 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
385 def _make_class_ldif(self, class_dn, class_name):
387 dn: """ + class_dn + """
389 objectClass: classSchema
390 adminDescription: """ + class_name + """
391 adminDisplayName: """ + class_name + """
392 cn: """ + class_name + """
393 governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.9939
395 objectClassCategory: 1
396 subClassOf: organizationalPerson
398 systemMustContain: cn
403 def test_msDS_IntId_on_class(self):
404 """Testing msDs-IntId creation for Class
405 Reference: MS-ADTS - 3.1.1.2.4.8 Class classSchema"""
407 # 1. Create Class without systemFlags
408 # msDS-IntId should be created if forest functional
409 # level is >= DS_DC_FUNCTION_2003
410 # and missing otherwise
411 (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-1-")
412 ldif = self._make_class_ldif(class_dn, class_name)
414 # try to add msDS-IntId during Class creation
415 ldif_add = ldif + "msDS-IntId: -1993108831\n"
416 self.ldb.add_ldif(ldif_add)
417 self._ldap_schemaUpdateNow()
419 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
420 self.assertEquals(len(res), 1)
421 self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831")
423 # add a new Class and update schema
424 (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-2-")
425 ldif = self._make_class_ldif(class_dn, class_name)
427 self.ldb.add_ldif(ldif)
428 self._ldap_schemaUpdateNow()
430 # Search for created Class
431 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
432 self.assertEquals(len(res), 1)
433 self.assertFalse("msDS-IntId" in res[0])
436 msg.dn = Dn(self.ldb, class_dn)
437 msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
440 self.fail("Modifying msDS-IntId should return error")
441 except LdbError, (num, _):
442 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
444 # 2. Create Class with systemFlags = FLAG_SCHEMA_BASE_OBJECT
445 # msDS-IntId should be created if forest functional
446 # level is >= DS_DC_FUNCTION_2003
447 # and missing otherwise
448 (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-3-")
449 ldif = self._make_class_ldif(class_dn, class_name)
450 ldif += "systemFlags: 16\n"
452 # try to add msDS-IntId during Class creation
453 ldif_add = ldif + "msDS-IntId: -1993108831\n"
454 self.ldb.add_ldif(ldif_add)
456 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
457 self.assertEquals(len(res), 1)
458 self.assertEquals(res[0]["msDS-IntId"][0], "-1993108831")
460 # add the new Class and update schema
461 (class_name, class_ldap_name, class_dn) = self._make_obj_names("msDS-IntId-Class-4-")
462 ldif = self._make_class_ldif(class_dn, class_name)
463 ldif += "systemFlags: 16\n"
465 self.ldb.add_ldif(ldif)
466 self._ldap_schemaUpdateNow()
468 # Search for created Class
469 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
470 self.assertEquals(len(res), 1)
471 self.assertFalse("msDS-IntId" in res[0])
474 msg.dn = Dn(self.ldb, class_dn)
475 msg["msDS-IntId"] = MessageElement("-1993108831", FLAG_MOD_REPLACE, "msDS-IntId")
478 self.fail("Modifying msDS-IntId should return error")
479 except LdbError, (num, _):
480 self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
481 res = self.ldb.search(class_dn, scope=SCOPE_BASE, attrs=["*"])
482 self.assertEquals(len(res), 1)
483 self.assertFalse("msDS-IntId" in res[0])
486 def test_verify_msDS_IntId(self):
487 """Verify msDS-IntId exists only on attributes without FLAG_SCHEMA_BASE_OBJECT flag set"""
489 res = self.ldb.search(self.schema_dn, scope=SCOPE_ONELEVEL,
490 expression="objectClass=attributeSchema",
491 attrs=["systemFlags", "msDS-IntId", "attributeID", "cn"])
492 self.assertTrue(len(res) > 1)
494 if self.forest_level >= DS_DC_FUNCTION_2003:
495 if self._is_schema_base_object(ldb_msg):
496 self.assertTrue("msDS-IntId" not in ldb_msg)
498 # don't assert here as there are plenty of
499 # attributes under w2k8 that are not part of
500 # Base Schema (SYSTEM_FLAG_SCHEMA_BASE_OBJECT flag not set)
501 # has not msDS-IntId attribute set
502 #self.assertTrue("msDS-IntId" in ldb_msg, "msDS-IntId expected on: %s" % ldb_msg.dn)
503 if "msDS-IntId" not in ldb_msg:
505 print "%3d warning: msDS-IntId expected on: %-30s %s" % (count, ldb_msg["attributeID"], ldb_msg["cn"])
507 self.assertTrue("msDS-IntId" not in ldb_msg)
510 if not "://" in host:
511 if os.path.isfile(host):
512 host = "tdb://%s" % host
514 host = "ldap://%s" % host
517 if host.startswith("ldap://"):
518 # user 'paged_search' module when connecting remotely
519 ldb_options = ["modules:paged_searches"]
521 ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp, options=ldb_options)
522 if not "tdb://" in host:
523 gc_ldb = Ldb("%s:3268" % host, credentials=creds,
524 session_info=system_session(), lp=lp)
528 runner = SubunitTestRunner()
530 if not runner.run(unittest.makeSuite(SchemaTests)).wasSuccessful():
532 if not runner.run(unittest.makeSuite(SchemaTests_msDS_IntId)).wasSuccessful():