2 # -*- coding: utf-8 -*-
3 # This is a port of the original in testprogs/ejs/ldap.js
13 sys.path.append("bin/python")
14 sys.path.append("../lib/subunit/python")
16 import samba.getopt as options
18 from samba.auth import system_session
19 from ldb import SCOPE_BASE, LdbError
20 from ldb import ERR_NO_SUCH_OBJECT
21 from ldb import Message, MessageElement, Dn
22 from ldb import FLAG_MOD_REPLACE
24 from samba import glue
26 from subunit.run import SubunitTestRunner
29 from samba.ndr import ndr_pack, ndr_unpack
30 from samba.dcerpc import security
32 parser = optparse.OptionParser("ldap [options] <host>")
33 sambaopts = options.SambaOptions(parser)
34 parser.add_option_group(sambaopts)
35 parser.add_option_group(options.VersionOptions(parser))
36 # use command line creds if available
37 credopts = options.CredentialsOptions(parser)
38 parser.add_option_group(credopts)
39 opts, args = parser.parse_args()
47 lp = sambaopts.get_loadparm()
48 creds = credopts.get_credentials(lp)
50 class UrgentReplicationTests(unittest.TestCase):
52 def delete_force(self, ldb, dn):
55 except LdbError, (num, _):
56 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
58 def find_basedn(self, ldb):
59 res = ldb.search(base="", expression="", scope=SCOPE_BASE,
60 attrs=["defaultNamingContext"])
61 self.assertEquals(len(res), 1)
62 return res[0]["defaultNamingContext"][0]
64 def find_configurationdn(self, ldb):
65 res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["configurationNamingContext"])
66 self.assertEquals(len(res), 1)
67 return res[0]["configurationNamingContext"][0]
69 def find_schemadn(self, ldb):
70 res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["schemaNamingContext"])
71 self.assertEquals(len(res), 1)
72 return res[0]["schemaNamingContext"][0]
74 def find_domain_sid(self):
75 res = self.ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
76 return ndr_unpack( security.dom_sid,res[0]["objectSid"][0])
81 self.base_dn = self.find_basedn(ldb)
82 self.configuration_dn = self.find_configurationdn(ldb)
83 self.schema_dn = self.find_schemadn(ldb)
84 self.domain_sid = self.find_domain_sid()
86 print "baseDN: %s\n" % self.base_dn
88 def test_nonurgent_object(self):
89 '''Test if the urgent replication is not activated
90 when handling a non urgent object'''
92 "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
94 "samaccountname":"nonurgenttest",
95 "description":"nonurgenttest description"});
97 ''' urgent replication shouldn't be enabled when creating '''
98 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
99 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
101 ''' urgent replication shouldn't be enabled when modifying '''
103 m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
104 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
107 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
108 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
110 ''' urgent replication shouldn't be enabled when deleting '''
111 self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
112 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
113 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
116 def test_nTDSDSA_object(self):
117 '''Test if the urgent replication is activated
118 when handling a nTDSDSA object'''
120 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
121 "objectclass":"server",
123 "name":"test server",
124 "systemFlags":"50000000"});
127 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
129 cn: NTDS Settings test
132 systemFlags: 33554432""", ["relax:0"]);
134 ''' urgent replication should be enabled when creation '''
135 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
136 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
138 ''' urgent replication should NOT be enabled when modifying '''
140 m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
141 m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
144 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
145 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
147 ''' urgent replication should be enabled when deleting '''
148 self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
149 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
150 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
152 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
155 def test_crossRef_object(self):
156 '''Test if the urgent replication is activated
157 when handling a crossRef object'''
159 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
160 "objectClass": "crossRef",
161 "cn": "test crossRef",
163 "nCName": self.base_dn,
164 "showInAdvancedViewOnly": "TRUE",
165 "name": "test crossRef",
166 "systemFlags": "1"});
168 ''' urgent replication should be enabled when creating '''
169 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
170 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
172 ''' urgent replication should NOT be enabled when modifying '''
174 m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
175 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
178 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
179 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
182 ''' urgent replication should be enabled when deleting '''
183 self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
184 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
185 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
189 def test_attributeSchema_object(self):
190 '''Test if the urgent replication is activated
191 when handling an attributeSchema object'''
195 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
196 objectClass: attributeSchema
197 cn: test attributeSchema
199 isSingleValued: FALSE
200 showInAdvancedViewOnly: FALSE
201 attributeID: 0.9.2342.19200300.100.1.1
202 attributeSyntax: 2.5.5.12
203 adminDisplayName: test attributeSchema
204 adminDescription: test attributeSchema
208 lDAPDisplayName: test attributeSchema
209 name: test attributeSchema
210 systemFlags: 0""", ["relax:0"]);
212 ''' urgent replication should be enabled when creating '''
213 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
214 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
217 print "Not testing urgent replication when creating attributeSchema object ...\n"
219 ''' urgent replication should be enabled when modifying '''
221 m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
222 m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
225 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
226 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
229 def test_classSchema_object(self):
230 '''Test if the urgent replication is activated
231 when handling a classSchema object'''
234 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
235 objectClass: classSchema
239 governsID: 1.2.840.113556.1.5.999
241 showInAdvancedViewOnly: TRUE
242 adminDisplayName: test classSchema
243 adminDescription: test classSchema
244 objectClassCategory: 1
245 lDAPDisplayName: test classSchema
246 name: test classSchema
248 systemPossSuperiors: dfsConfiguration
249 systemMustContain: msDFS-SchemaMajorVersion
250 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
251 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
253 defaultHidingValue: TRUE""", ["relax:0"]);
255 ''' urgent replication should be enabled when creating '''
256 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
257 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
260 print "Not testing urgent replication when creating classSchema object ...\n"
262 ''' urgent replication should be enabled when modifying '''
264 m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
265 m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
268 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
269 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
272 def test_secret_object(self):
274 '''Test if the urgent replication is activated
275 when handling a secret object'''
278 "dn": "cn=test secret,cn=System," + self.base_dn,
279 "objectClass":"secret",
281 "name":"test secret",
282 "currentValue":"xxxxxxx"});
285 ''' urgent replication should be enabled when creationg '''
286 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
287 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
289 ''' urgent replication should be enabled when modifying '''
291 m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
292 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
295 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
296 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
298 ''' urgent replication should NOT be enabled when deleting '''
299 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
300 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
301 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
304 def test_rIDManager_object(self):
305 '''Test if the urgent replication is activated
306 when handling a rIDManager object'''
308 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
309 objectClass: rIDManager
312 showInAdvancedViewOnly: TRUE
313 name: RID Manager test
314 systemFlags: -1946157056
315 isCriticalSystemObject: TRUE
316 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
318 ''' urgent replication should be enabled when creating '''
319 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
320 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
322 ''' urgent replication should be enabled when modifying '''
324 m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
325 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
328 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
329 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
331 ''' urgent replication should NOT be enabled when deleting '''
332 self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
333 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
334 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
337 def test_urgent_attributes(self):
338 '''Test if the urgent replication is activated
339 when handling urgent attributes of an object'''
342 "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
343 "objectclass":"user",
344 "samaccountname":"user UrgAttr test",
345 "userAccountControl":"1",
348 "description":"urgent attributes test description"});
350 ''' urgent replication should NOT be enabled when creating '''
351 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
352 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
354 ''' urgent replication should be enabled when modifying userAccountControl '''
356 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
357 m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
358 "userAccountControl")
360 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
361 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
363 ''' urgent replication should be enabled when modifying lockoutTime '''
365 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
366 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
369 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
370 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
372 ''' urgent replication should be enabled when modifying pwdLastSet '''
374 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
375 m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
378 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
379 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
381 ''' urgent replication should NOT be enabled when modifying a not-urgent attribute '''
383 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
384 m["description"] = MessageElement("updated urgent attributes test description",
385 FLAG_MOD_REPLACE, "description")
387 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
388 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
390 ''' urgent replication should NOT be enabled when deleting '''
391 self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
392 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
393 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
396 if not "://" in host:
397 if os.path.isfile(host):
398 host = "tdb://%s" % host
400 host = "ldap://%s" % host
403 ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp)
404 if not "tdb://" in host:
405 gc_ldb = Ldb("%s:3268" % host, credentials=creds,
406 session_info=system_session(), lp=lp)
410 runner = SubunitTestRunner()
412 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():