2 # -*- coding: utf-8 -*-
8 sys.path.append("bin/python")
10 samba.ensure_external_module("testtools", "testtools")
11 samba.ensure_external_module("subunit", "subunit/python")
13 import samba.getopt as options
15 from samba.auth import system_session
16 from ldb import (SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT, Message,
17 MessageElement, Dn, FLAG_MOD_REPLACE)
18 from samba.samdb import SamDB
20 import samba.dsdb as dsdb
22 from subunit.run import SubunitTestRunner
25 parser = optparse.OptionParser("urgent_replication.py [options] <host>")
26 sambaopts = options.SambaOptions(parser)
27 parser.add_option_group(sambaopts)
28 parser.add_option_group(options.VersionOptions(parser))
29 # use command line creds if available
30 credopts = options.CredentialsOptions(parser)
31 parser.add_option_group(credopts)
32 opts, args = parser.parse_args()
40 lp = sambaopts.get_loadparm()
41 creds = credopts.get_credentials(lp)
43 class UrgentReplicationTests(samba.tests.TestCase):
45 def delete_force(self, ldb, dn):
47 ldb.delete(dn, ["relax:0"])
48 except LdbError, (num, _):
49 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
52 super(UrgentReplicationTests, self).setUp()
54 self.base_dn = ldb.domain_dn()
56 print "baseDN: %s\n" % self.base_dn
58 def test_nonurgent_object(self):
59 """Test if the urgent replication is not activated
60 when handling a non urgent object"""
62 "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
64 "samaccountname":"nonurgenttest",
65 "description":"nonurgenttest description"})
67 # urgent replication should not be enabled when creating
68 res = self.ldb.load_partition_usn(self.base_dn)
69 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
71 # urgent replication should not be enabled when modifying
73 m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
74 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
77 res = self.ldb.load_partition_usn(self.base_dn)
78 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
80 # urgent replication should not be enabled when deleting
81 self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
82 res = self.ldb.load_partition_usn(self.base_dn)
83 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
86 def test_nTDSDSA_object(self):
87 '''Test if the urgent replication is activated
88 when handling a nTDSDSA object'''
90 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
91 "objectclass":"server",
94 "systemFlags":"50000000"}, ["relax:0"])
97 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
99 cn: NTDS Settings test
102 systemFlags: 33554432""", ["relax:0"])
104 # urgent replication should be enabled when creation
105 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
106 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
108 # urgent replication should NOT be enabled when modifying
110 m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
111 m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
114 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
115 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
117 # urgent replication should be enabled when deleting
118 self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
119 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
120 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
122 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
125 def test_crossRef_object(self):
126 '''Test if the urgent replication is activated
127 when handling a crossRef object'''
129 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
130 "objectClass": "crossRef",
131 "cn": "test crossRef",
132 "dnsRoot": lp.get("realm").lower(),
134 "nCName": self.base_dn,
135 "showInAdvancedViewOnly": "TRUE",
136 "name": "test crossRef",
137 "systemFlags": "1"}, ["relax:0"])
139 # urgent replication should be enabled when creating
140 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
141 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
143 # urgent replication should NOT be enabled when modifying
145 m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
146 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
149 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
150 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
153 # urgent replication should be enabled when deleting
154 self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
155 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
156 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
160 def test_attributeSchema_object(self):
161 '''Test if the urgent replication is activated
162 when handling an attributeSchema object'''
166 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
167 objectClass: attributeSchema
168 cn: test attributeSchema
170 isSingleValued: FALSE
171 showInAdvancedViewOnly: FALSE
172 attributeID: 0.9.2342.19200300.100.1.1
173 attributeSyntax: 2.5.5.12
174 adminDisplayName: test attributeSchema
175 adminDescription: test attributeSchema
179 lDAPDisplayName: test attributeSchema
180 name: test attributeSchema""")
182 # urgent replication should be enabled when creating
183 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
184 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
187 print "Not testing urgent replication when creating attributeSchema object ...\n"
189 # urgent replication should be enabled when modifying
191 m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
192 m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
195 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
196 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
199 def test_classSchema_object(self):
200 '''Test if the urgent replication is activated
201 when handling a classSchema object'''
204 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
205 objectClass: classSchema
209 governsID: 1.2.840.113556.1.5.999
211 showInAdvancedViewOnly: TRUE
212 adminDisplayName: test classSchema
213 adminDescription: test classSchema
214 objectClassCategory: 1
215 lDAPDisplayName: test classSchema
216 name: test classSchema
218 systemPossSuperiors: dfsConfiguration
219 systemMustContain: msDFS-SchemaMajorVersion
220 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
221 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
223 defaultHidingValue: TRUE""")
225 # urgent replication should be enabled when creating
226 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
227 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
230 print "Not testing urgent replication when creating classSchema object ...\n"
232 # urgent replication should be enabled when modifying
234 m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
235 m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
238 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
239 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
242 def test_secret_object(self):
243 '''Test if the urgent replication is activated
244 when handling a secret object'''
247 "dn": "cn=test secret,cn=System," + self.base_dn,
248 "objectClass":"secret",
250 "name":"test secret",
251 "currentValue":"xxxxxxx"}, ["relax:0"])
253 # urgent replication should be enabled when creating
254 res = self.ldb.load_partition_usn(self.base_dn)
255 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
257 # urgent replication should be enabled when modifying
259 m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
260 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
263 res = self.ldb.load_partition_usn(self.base_dn)
264 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
266 # urgent replication should NOT be enabled when deleting
267 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
268 res = self.ldb.load_partition_usn(self.base_dn)
269 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
272 def test_rIDManager_object(self):
273 '''Test if the urgent replication is activated
274 when handling a rIDManager object'''
276 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
277 objectClass: rIDManager
280 showInAdvancedViewOnly: TRUE
281 name: RID Manager test
282 systemFlags: -1946157056
283 isCriticalSystemObject: TRUE
284 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
286 # urgent replication should be enabled when creating
287 res = self.ldb.load_partition_usn(self.base_dn)
288 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
290 # urgent replication should be enabled when modifying
292 m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
293 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
296 res = self.ldb.load_partition_usn(self.base_dn)
297 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
299 # urgent replication should NOT be enabled when deleting
300 self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
301 res = self.ldb.load_partition_usn(self.base_dn)
302 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
305 def test_urgent_attributes(self):
306 '''Test if the urgent replication is activated
307 when handling urgent attributes of an object'''
310 "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
311 "objectclass":"user",
312 "samaccountname":"user UrgAttr test",
313 "userAccountControl":str(dsdb.UF_NORMAL_ACCOUNT),
316 "description":"urgent attributes test description"})
318 # urgent replication should NOT be enabled when creating
319 res = self.ldb.load_partition_usn(self.base_dn)
320 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
322 # urgent replication should be enabled when modifying userAccountControl
324 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
325 m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT+dsdb.UF_SMARTCARD_REQUIRED), FLAG_MOD_REPLACE,
326 "userAccountControl")
328 res = self.ldb.load_partition_usn(self.base_dn)
329 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
331 # urgent replication should be enabled when modifying lockoutTime
333 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
334 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
337 res = self.ldb.load_partition_usn(self.base_dn)
338 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
340 # urgent replication should be enabled when modifying pwdLastSet
342 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
343 m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
346 res = self.ldb.load_partition_usn(self.base_dn)
347 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
349 # urgent replication should NOT be enabled when modifying a not-urgent
352 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
353 m["description"] = MessageElement("updated urgent attributes test description",
354 FLAG_MOD_REPLACE, "description")
356 res = self.ldb.load_partition_usn(self.base_dn)
357 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
359 # urgent replication should NOT be enabled when deleting
360 self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
361 res = self.ldb.load_partition_usn(self.base_dn)
362 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
365 if not "://" in host:
366 if os.path.isfile(host):
367 host = "tdb://%s" % host
369 host = "ldap://%s" % host
372 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp,
375 runner = SubunitTestRunner()
377 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():