2 # -*- coding: utf-8 -*-
4 from __future__ import print_function
7 sys.path.insert(0, "bin/python")
9 from samba.tests.subunitrun import TestProgram, SubunitOptions
11 from ldb import (LdbError, ERR_NO_SUCH_OBJECT, Message,
12 MessageElement, Dn, FLAG_MOD_REPLACE)
14 import samba.dsdb as dsdb
15 import samba.getopt as options
18 parser = optparse.OptionParser("urgent_replication.py [options] <host>")
19 sambaopts = options.SambaOptions(parser)
20 parser.add_option_group(sambaopts)
21 parser.add_option_group(options.VersionOptions(parser))
23 # use command line creds if available
24 credopts = options.CredentialsOptions(parser)
25 parser.add_option_group(credopts)
26 subunitopts = SubunitOptions(parser)
27 parser.add_option_group(subunitopts)
28 opts, args = parser.parse_args()
37 class UrgentReplicationTests(samba.tests.TestCase):
39 def delete_force(self, ldb, dn):
41 ldb.delete(dn, ["relax:0"])
44 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
47 super(UrgentReplicationTests, self).setUp()
48 self.ldb = samba.tests.connect_samdb(host, global_schema=False)
49 self.base_dn = self.ldb.domain_dn()
51 print("baseDN: %s\n" % self.base_dn)
53 def test_nonurgent_object(self):
54 """Test if the urgent replication is not activated when handling a non urgent object."""
56 "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
57 "objectclass": "user",
58 "samaccountname": "nonurgenttest",
59 "description": "nonurgenttest description"})
61 # urgent replication should not be enabled when creating
62 res = self.ldb.load_partition_usn(self.base_dn)
63 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
65 # urgent replication should not be enabled when modifying
67 m.dn = Dn(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
68 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
71 res = self.ldb.load_partition_usn(self.base_dn)
72 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
74 # urgent replication should not be enabled when deleting
75 self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
76 res = self.ldb.load_partition_usn(self.base_dn)
77 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
79 def test_nTDSDSA_object(self):
80 """Test if the urgent replication is activated when handling a nTDSDSA object."""
82 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" %
83 self.ldb.get_config_basedn(),
84 "objectclass": "server",
86 "name": "test server",
87 "systemFlags": "50000000"}, ["relax:0"])
90 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
92 cn: NTDS Settings test
95 systemFlags: 33554432""", ["relax:0"])
97 # urgent replication should be enabled when creation
98 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
99 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
101 # urgent replication should NOT be enabled when modifying
103 m.dn = Dn(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
104 m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
107 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
108 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
110 # urgent replication should be enabled when deleting
111 self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
112 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
113 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
115 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
117 def test_crossRef_object(self):
118 """Test if the urgent replication is activated when handling a crossRef object."""
120 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn,
121 "objectClass": "crossRef",
122 "cn": "test crossRef",
123 "dnsRoot": self.get_loadparm().get("realm").lower(),
125 "nCName": self.base_dn,
126 "showInAdvancedViewOnly": "TRUE",
127 "name": "test crossRef",
128 "systemFlags": "1"}, ["relax:0"])
130 # urgent replication should be enabled when creating
131 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
132 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
134 # urgent replication should NOT be enabled when modifying
136 m.dn = Dn(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
137 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
140 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
141 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
143 # urgent replication should be enabled when deleting
144 self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
145 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
146 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
148 def test_attributeSchema_object(self):
149 """Test if the urgent replication is activated when handling an attributeSchema object"""
152 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
153 objectClass: attributeSchema
154 cn: test attributeSchema
156 isSingleValued: FALSE
157 showInAdvancedViewOnly: FALSE
158 attributeID: 1.3.6.1.4.1.7165.4.6.1.4.""" + str(random.randint(1, 100000)) + """
159 attributeSyntax: 2.5.5.12
160 adminDisplayName: test attributeSchema
161 adminDescription: test attributeSchema
165 lDAPDisplayName: testAttributeSchema
166 name: test attributeSchema""")
168 # urgent replication should be enabled when creating
169 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
170 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
172 # urgent replication should be enabled when modifying
174 m.dn = Dn(self.ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
175 m["lDAPDisplayName"] = MessageElement("updatedTestAttributeSchema", FLAG_MOD_REPLACE,
178 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
179 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
181 def test_classSchema_object(self):
182 """Test if the urgent replication is activated when handling a classSchema object."""
185 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
186 objectClass: classSchema
190 governsId: 1.3.6.1.4.1.7165.4.6.2.4.""" + str(random.randint(1, 100000)) + """
192 showInAdvancedViewOnly: TRUE
193 adminDisplayName: test classSchema
194 adminDescription: test classSchema
195 objectClassCategory: 1
196 lDAPDisplayName: testClassSchema
197 name: test classSchema
199 systemPossSuperiors: dfsConfiguration
200 systemMustContain: msDFS-SchemaMajorVersion
201 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
202 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
204 defaultHidingValue: TRUE""")
206 # urgent replication should be enabled when creating
207 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
208 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
211 print("Not testing urgent replication when creating classSchema object ...\n")
213 # urgent replication should be enabled when modifying
215 m.dn = Dn(self.ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
216 m["lDAPDisplayName"] = MessageElement("updatedTestClassSchema", FLAG_MOD_REPLACE,
219 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
220 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
222 def test_secret_object(self):
223 """Test if the urgent replication is activated when handling a secret object."""
226 "dn": "cn=test secret,cn=System," + self.base_dn,
227 "objectClass": "secret",
229 "name": "test secret",
230 "currentValue": "xxxxxxx"})
232 # urgent replication should be enabled when creating
233 res = self.ldb.load_partition_usn(self.base_dn)
234 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
236 # urgent replication should be enabled when modifying
238 m.dn = Dn(self.ldb, "cn=test secret,cn=System," + self.base_dn)
239 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
242 res = self.ldb.load_partition_usn(self.base_dn)
243 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
245 # urgent replication should NOT be enabled when deleting
246 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
247 res = self.ldb.load_partition_usn(self.base_dn)
248 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
250 def test_rIDManager_object(self):
251 """Test if the urgent replication is activated when handling a rIDManager object."""
253 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
254 objectClass: rIDManager
257 showInAdvancedViewOnly: TRUE
258 name: RID Manager test
259 systemFlags: -1946157056
260 isCriticalSystemObject: TRUE
261 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
263 # urgent replication should be enabled when creating
264 res = self.ldb.load_partition_usn(self.base_dn)
265 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
267 # urgent replication should be enabled when modifying
269 m.dn = Dn(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
270 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
273 res = self.ldb.load_partition_usn(self.base_dn)
274 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
276 # urgent replication should NOT be enabled when deleting
277 self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
278 res = self.ldb.load_partition_usn(self.base_dn)
279 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
281 def test_urgent_attributes(self):
282 """Test if the urgent replication is activated when handling urgent attributes of an object."""
285 "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
286 "objectclass": "user",
287 "samaccountname": "user UrgAttr test",
288 "userAccountControl": str(dsdb.UF_NORMAL_ACCOUNT),
291 "description": "urgent attributes test description"})
293 # urgent replication should NOT be enabled when creating
294 res = self.ldb.load_partition_usn(self.base_dn)
295 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
297 # urgent replication should be enabled when modifying userAccountControl
299 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
300 m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT + dsdb.UF_DONT_EXPIRE_PASSWD), FLAG_MOD_REPLACE,
301 "userAccountControl")
303 res = self.ldb.load_partition_usn(self.base_dn)
304 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
306 # urgent replication should be enabled when modifying lockoutTime
308 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
309 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
312 res = self.ldb.load_partition_usn(self.base_dn)
313 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
315 # urgent replication should be enabled when modifying pwdLastSet
317 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
318 m["pwdLastSet"] = MessageElement("-1", FLAG_MOD_REPLACE,
321 res = self.ldb.load_partition_usn(self.base_dn)
322 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
324 # urgent replication should NOT be enabled when modifying a not-urgent
327 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
328 m["description"] = MessageElement("updated urgent attributes test description",
329 FLAG_MOD_REPLACE, "description")
331 res = self.ldb.load_partition_usn(self.base_dn)
332 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
334 # urgent replication should NOT be enabled when deleting
335 self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
336 res = self.ldb.load_partition_usn(self.base_dn)
337 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
340 TestProgram(module=__name__, opts=subunitopts)