2 # -*- coding: utf-8 -*-
3 # This is a port of the original in testprogs/ejs/ldap.js
13 sys.path.append("bin/python")
14 sys.path.append("../lib/subunit/python")
16 import samba.getopt as options
18 from samba.auth import system_session
19 from ldb import SCOPE_BASE, LdbError
20 from ldb import ERR_NO_SUCH_OBJECT
21 from ldb import Message, MessageElement, Dn
22 from ldb import FLAG_MOD_REPLACE
24 from samba import glue
26 from subunit.run import SubunitTestRunner
29 from samba.ndr import ndr_pack, ndr_unpack
30 from samba.dcerpc import security
32 parser = optparse.OptionParser("urgent_replication [options] <host>")
33 sambaopts = options.SambaOptions(parser)
34 parser.add_option_group(sambaopts)
35 parser.add_option_group(options.VersionOptions(parser))
36 # use command line creds if available
37 credopts = options.CredentialsOptions(parser)
38 parser.add_option_group(credopts)
39 opts, args = parser.parse_args()
47 lp = sambaopts.get_loadparm()
48 creds = credopts.get_credentials(lp)
50 class UrgentReplicationTests(unittest.TestCase):
52 def delete_force(self, ldb, dn):
55 except LdbError, (num, _):
56 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
58 def find_basedn(self, ldb):
59 res = ldb.search(base="", expression="", scope=SCOPE_BASE,
60 attrs=["defaultNamingContext"])
61 self.assertEquals(len(res), 1)
62 return res[0]["defaultNamingContext"][0]
66 self.base_dn = self.find_basedn(ldb)
68 print "baseDN: %s\n" % self.base_dn
70 def test_nonurgent_object(self):
71 '''Test if the urgent replication is not activated
72 when handling a non urgent object'''
74 "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
76 "samaccountname":"nonurgenttest",
77 "description":"nonurgenttest description"});
79 ''' urgent replication should not be enabled when creating '''
80 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
81 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
83 ''' urgent replication should not be enabled when modifying '''
85 m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
86 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
89 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
90 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
92 ''' urgent replication should not be enabled when deleting '''
93 self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
94 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
95 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
98 def test_nTDSDSA_object(self):
99 '''Test if the urgent replication is activated
100 when handling a nTDSDSA object'''
102 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
103 "objectclass":"server",
105 "name":"test server",
106 "systemFlags":"50000000"});
109 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
111 cn: NTDS Settings test
114 systemFlags: 33554432""", ["relax:0"]);
116 ''' urgent replication should be enabled when creation '''
117 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
118 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
120 ''' urgent replication should NOT be enabled when modifying '''
122 m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
123 m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
126 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
127 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
129 ''' urgent replication should be enabled when deleting '''
130 self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
131 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
132 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
134 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
137 def test_crossRef_object(self):
138 '''Test if the urgent replication is activated
139 when handling a crossRef object'''
141 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
142 "objectClass": "crossRef",
143 "cn": "test crossRef",
145 "nCName": self.base_dn,
146 "showInAdvancedViewOnly": "TRUE",
147 "name": "test crossRef",
148 "systemFlags": "1"});
150 ''' urgent replication should be enabled when creating '''
151 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
152 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
154 ''' urgent replication should NOT be enabled when modifying '''
156 m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
157 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
160 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
161 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
164 ''' urgent replication should be enabled when deleting '''
165 self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
166 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
167 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
171 def test_attributeSchema_object(self):
172 '''Test if the urgent replication is activated
173 when handling an attributeSchema object'''
177 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
178 objectClass: attributeSchema
179 cn: test attributeSchema
181 isSingleValued: FALSE
182 showInAdvancedViewOnly: FALSE
183 attributeID: 0.9.2342.19200300.100.1.1
184 attributeSyntax: 2.5.5.12
185 adminDisplayName: test attributeSchema
186 adminDescription: test attributeSchema
190 lDAPDisplayName: test attributeSchema
191 name: test attributeSchema
192 systemFlags: 0""", ["relax:0"]);
194 ''' urgent replication should be enabled when creating '''
195 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
196 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
199 print "Not testing urgent replication when creating attributeSchema object ...\n"
201 ''' urgent replication should be enabled when modifying '''
203 m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
204 m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
207 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
208 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
211 def test_classSchema_object(self):
212 '''Test if the urgent replication is activated
213 when handling a classSchema object'''
216 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
217 objectClass: classSchema
221 governsID: 1.2.840.113556.1.5.999
223 showInAdvancedViewOnly: TRUE
224 adminDisplayName: test classSchema
225 adminDescription: test classSchema
226 objectClassCategory: 1
227 lDAPDisplayName: test classSchema
228 name: test classSchema
230 systemPossSuperiors: dfsConfiguration
231 systemMustContain: msDFS-SchemaMajorVersion
232 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
233 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
235 defaultHidingValue: TRUE""", ["relax:0"]);
237 ''' urgent replication should be enabled when creating '''
238 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
239 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
242 print "Not testing urgent replication when creating classSchema object ...\n"
244 ''' urgent replication should be enabled when modifying '''
246 m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
247 m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
250 res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
251 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
254 def test_secret_object(self):
256 '''Test if the urgent replication is activated
257 when handling a secret object'''
260 "dn": "cn=test secret,cn=System," + self.base_dn,
261 "objectClass":"secret",
263 "name":"test secret",
264 "currentValue":"xxxxxxx"});
267 ''' urgent replication should be enabled when creationg '''
268 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
269 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
271 ''' urgent replication should be enabled when modifying '''
273 m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
274 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
277 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
278 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
280 ''' urgent replication should NOT be enabled when deleting '''
281 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
282 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
283 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
286 def test_rIDManager_object(self):
287 '''Test if the urgent replication is activated
288 when handling a rIDManager object'''
290 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
291 objectClass: rIDManager
294 showInAdvancedViewOnly: TRUE
295 name: RID Manager test
296 systemFlags: -1946157056
297 isCriticalSystemObject: TRUE
298 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
300 ''' urgent replication should be enabled when creating '''
301 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
302 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
304 ''' urgent replication should be enabled when modifying '''
306 m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
307 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
310 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
311 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
313 ''' urgent replication should NOT be enabled when deleting '''
314 self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
315 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
316 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
319 def test_urgent_attributes(self):
320 '''Test if the urgent replication is activated
321 when handling urgent attributes of an object'''
324 "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
325 "objectclass":"user",
326 "samaccountname":"user UrgAttr test",
327 "userAccountControl":"1",
330 "description":"urgent attributes test description"});
332 ''' urgent replication should NOT be enabled when creating '''
333 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
334 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
336 ''' urgent replication should be enabled when modifying userAccountControl '''
338 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
339 m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
340 "userAccountControl")
342 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
343 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
345 ''' urgent replication should be enabled when modifying lockoutTime '''
347 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
348 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
351 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
352 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
354 ''' urgent replication should be enabled when modifying pwdLastSet '''
356 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
357 m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
360 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
361 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
363 ''' urgent replication should NOT be enabled when modifying a not-urgent attribute '''
365 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
366 m["description"] = MessageElement("updated urgent attributes test description",
367 FLAG_MOD_REPLACE, "description")
369 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
370 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
372 ''' urgent replication should NOT be enabled when deleting '''
373 self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
374 res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
375 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
378 if not "://" in host:
379 if os.path.isfile(host):
380 host = "tdb://%s" % host
382 host = "ldap://%s" % host
385 ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp)
387 runner = SubunitTestRunner()
389 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():