2 # -*- coding: utf-8 -*-
4 from __future__ import print_function
7 sys.path.insert(0, "bin/python")
9 from samba.tests.subunitrun import TestProgram, SubunitOptions
11 from ldb import (LdbError, ERR_NO_SUCH_OBJECT, Message,
12 MessageElement, Dn, FLAG_MOD_REPLACE)
14 import samba.dsdb as dsdb
15 import samba.getopt as options
18 parser = optparse.OptionParser("urgent_replication.py [options] <host>")
19 sambaopts = options.SambaOptions(parser)
20 parser.add_option_group(sambaopts)
21 parser.add_option_group(options.VersionOptions(parser))
23 # use command line creds if available
24 credopts = options.CredentialsOptions(parser)
25 parser.add_option_group(credopts)
26 subunitopts = SubunitOptions(parser)
27 parser.add_option_group(subunitopts)
28 opts, args = parser.parse_args()
37 class UrgentReplicationTests(samba.tests.TestCase):
39 def delete_force(self, ldb, dn):
41 ldb.delete(dn, ["relax:0"])
44 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
47 super(UrgentReplicationTests, self).setUp()
48 self.ldb = samba.tests.connect_samdb(host, global_schema=False)
49 self.base_dn = self.ldb.domain_dn()
51 print("baseDN: %s\n" % self.base_dn)
53 def test_nonurgent_object(self):
54 """Test if the urgent replication is not activated when handling a non urgent object."""
56 "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
58 "samaccountname":"nonurgenttest",
59 "description":"nonurgenttest description"})
61 # urgent replication should not be enabled when creating
62 res = self.ldb.load_partition_usn(self.base_dn)
63 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
65 # urgent replication should not be enabled when modifying
67 m.dn = Dn(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
68 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
71 res = self.ldb.load_partition_usn(self.base_dn)
72 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
74 # urgent replication should not be enabled when deleting
75 self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
76 res = self.ldb.load_partition_usn(self.base_dn)
77 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
79 def test_nTDSDSA_object(self):
80 """Test if the urgent replication is activated when handling a nTDSDSA object."""
82 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" %
83 self.ldb.get_config_basedn(),
84 "objectclass":"server",
87 "systemFlags":"50000000"}, ["relax:0"])
90 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
92 cn: NTDS Settings test
95 systemFlags: 33554432""", ["relax:0"])
97 # urgent replication should be enabled when creation
98 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
99 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
101 # urgent replication should NOT be enabled when modifying
103 m.dn = Dn(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
104 m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
107 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
108 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
110 # urgent replication should be enabled when deleting
111 self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
112 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
113 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
115 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
117 def test_crossRef_object(self):
118 """Test if the urgent replication is activated when handling a crossRef object."""
120 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn,
121 "objectClass": "crossRef",
122 "cn": "test crossRef",
123 "dnsRoot": self.get_loadparm().get("realm").lower(),
125 "nCName": self.base_dn,
126 "showInAdvancedViewOnly": "TRUE",
127 "name": "test crossRef",
128 "systemFlags": "1"}, ["relax:0"])
130 # urgent replication should be enabled when creating
131 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
132 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
134 # urgent replication should NOT be enabled when modifying
136 m.dn = Dn(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
137 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
140 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
141 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
144 # urgent replication should be enabled when deleting
145 self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
146 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
147 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
149 def test_attributeSchema_object(self):
150 """Test if the urgent replication is activated when handling an attributeSchema object"""
153 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
154 objectClass: attributeSchema
155 cn: test attributeSchema
157 isSingleValued: FALSE
158 showInAdvancedViewOnly: FALSE
159 attributeID: 1.3.6.1.4.1.7165.4.6.1.4.""" + str(random.randint(1,100000)) + """
160 attributeSyntax: 2.5.5.12
161 adminDisplayName: test attributeSchema
162 adminDescription: test attributeSchema
166 lDAPDisplayName: testAttributeSchema
167 name: test attributeSchema""")
169 # urgent replication should be enabled when creating
170 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
171 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
173 # urgent replication should be enabled when modifying
175 m.dn = Dn(self.ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
176 m["lDAPDisplayName"] = MessageElement("updatedTestAttributeSchema", FLAG_MOD_REPLACE,
179 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
180 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
182 def test_classSchema_object(self):
183 """Test if the urgent replication is activated when handling a classSchema object."""
186 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
187 objectClass: classSchema
191 governsId: 1.3.6.1.4.1.7165.4.6.2.4.""" + str(random.randint(1,100000)) + """
193 showInAdvancedViewOnly: TRUE
194 adminDisplayName: test classSchema
195 adminDescription: test classSchema
196 objectClassCategory: 1
197 lDAPDisplayName: testClassSchema
198 name: test classSchema
200 systemPossSuperiors: dfsConfiguration
201 systemMustContain: msDFS-SchemaMajorVersion
202 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
203 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
205 defaultHidingValue: TRUE""")
207 # urgent replication should be enabled when creating
208 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
209 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
212 print("Not testing urgent replication when creating classSchema object ...\n")
214 # urgent replication should be enabled when modifying
216 m.dn = Dn(self.ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
217 m["lDAPDisplayName"] = MessageElement("updatedTestClassSchema", FLAG_MOD_REPLACE,
220 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
221 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
223 def test_secret_object(self):
224 """Test if the urgent replication is activated when handling a secret object."""
227 "dn": "cn=test secret,cn=System," + self.base_dn,
228 "objectClass":"secret",
230 "name":"test secret",
231 "currentValue":"xxxxxxx"})
233 # urgent replication should be enabled when creating
234 res = self.ldb.load_partition_usn(self.base_dn)
235 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
237 # urgent replication should be enabled when modifying
239 m.dn = Dn(self.ldb, "cn=test secret,cn=System," + self.base_dn)
240 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
243 res = self.ldb.load_partition_usn(self.base_dn)
244 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
246 # urgent replication should NOT be enabled when deleting
247 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
248 res = self.ldb.load_partition_usn(self.base_dn)
249 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
251 def test_rIDManager_object(self):
252 """Test if the urgent replication is activated when handling a rIDManager object."""
254 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
255 objectClass: rIDManager
258 showInAdvancedViewOnly: TRUE
259 name: RID Manager test
260 systemFlags: -1946157056
261 isCriticalSystemObject: TRUE
262 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
264 # urgent replication should be enabled when creating
265 res = self.ldb.load_partition_usn(self.base_dn)
266 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
268 # urgent replication should be enabled when modifying
270 m.dn = Dn(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
271 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
274 res = self.ldb.load_partition_usn(self.base_dn)
275 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
277 # urgent replication should NOT be enabled when deleting
278 self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
279 res = self.ldb.load_partition_usn(self.base_dn)
280 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
282 def test_urgent_attributes(self):
283 """Test if the urgent replication is activated when handling urgent attributes of an object."""
286 "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
287 "objectclass":"user",
288 "samaccountname":"user UrgAttr test",
289 "userAccountControl":str(dsdb.UF_NORMAL_ACCOUNT),
292 "description":"urgent attributes test description"})
294 # urgent replication should NOT be enabled when creating
295 res = self.ldb.load_partition_usn(self.base_dn)
296 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
298 # urgent replication should be enabled when modifying userAccountControl
300 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
301 m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT+dsdb.UF_DONT_EXPIRE_PASSWD), FLAG_MOD_REPLACE,
302 "userAccountControl")
304 res = self.ldb.load_partition_usn(self.base_dn)
305 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
307 # urgent replication should be enabled when modifying lockoutTime
309 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
310 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
313 res = self.ldb.load_partition_usn(self.base_dn)
314 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
316 # urgent replication should be enabled when modifying pwdLastSet
318 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
319 m["pwdLastSet"] = MessageElement("-1", FLAG_MOD_REPLACE,
322 res = self.ldb.load_partition_usn(self.base_dn)
323 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
325 # urgent replication should NOT be enabled when modifying a not-urgent
328 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
329 m["description"] = MessageElement("updated urgent attributes test description",
330 FLAG_MOD_REPLACE, "description")
332 res = self.ldb.load_partition_usn(self.base_dn)
333 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
335 # urgent replication should NOT be enabled when deleting
336 self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
337 res = self.ldb.load_partition_usn(self.base_dn)
338 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
341 TestProgram(module=__name__, opts=subunitopts)