s4:dsdb python tests - use "ldb.domain_dn"
[kai/samba.git] / source4 / dsdb / tests / python / urgent_replication.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import optparse
5 import sys
6 import os
7
8 sys.path.append("bin/python")
9 import samba
10 samba.ensure_external_module("testtools", "testtools")
11 samba.ensure_external_module("subunit", "subunit/python")
12
13 import samba.getopt as options
14
15 from samba.auth import system_session
16 from ldb import (SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT, Message,
17     MessageElement, Dn, FLAG_MOD_REPLACE)
18 from samba.samdb import SamDB
19 import samba.tests
20 import samba.dsdb as dsdb
21
22 from subunit.run import SubunitTestRunner
23 import unittest
24
25 parser = optparse.OptionParser("urgent_replication.py [options] <host>")
26 sambaopts = options.SambaOptions(parser)
27 parser.add_option_group(sambaopts)
28 parser.add_option_group(options.VersionOptions(parser))
29 # use command line creds if available
30 credopts = options.CredentialsOptions(parser)
31 parser.add_option_group(credopts)
32 opts, args = parser.parse_args()
33
34 if len(args) < 1:
35     parser.print_usage()
36     sys.exit(1)
37
38 host = args[0]
39
40 lp = sambaopts.get_loadparm()
41 creds = credopts.get_credentials(lp)
42
43 class UrgentReplicationTests(samba.tests.TestCase):
44
45     def delete_force(self, ldb, dn):
46         try:
47             ldb.delete(dn, ["relax:0"])
48         except LdbError, (num, _):
49             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
50
51     def setUp(self):
52         super(UrgentReplicationTests, self).setUp()
53         self.ldb = ldb
54         self.base_dn = ldb.domain_dn()
55
56         print "baseDN: %s\n" % self.base_dn
57
58     def test_nonurgent_object(self):
59         """Test if the urgent replication is not activated
60            when handling a non urgent object"""
61         self.ldb.add({
62             "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
63             "objectclass":"user",
64             "samaccountname":"nonurgenttest",
65             "description":"nonurgenttest description"})
66
67         # urgent replication should not be enabled when creating 
68         res = self.ldb.load_partition_usn(self.base_dn)
69         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
70
71         # urgent replication should not be enabled when modifying
72         m = Message()
73         m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
74         m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
75           "description")
76         ldb.modify(m)
77         res = self.ldb.load_partition_usn(self.base_dn)
78         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
79
80         # urgent replication should not be enabled when deleting
81         self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
82         res = self.ldb.load_partition_usn(self.base_dn)
83         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
84
85
86     def test_nTDSDSA_object(self):
87         '''Test if the urgent replication is activated
88            when handling a nTDSDSA object'''
89         self.ldb.add({
90             "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
91             "objectclass":"server",
92             "cn":"test server",
93             "name":"test server",
94             "systemFlags":"50000000"}, ["relax:0"])
95
96         self.ldb.add_ldif(
97             """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
98 objectclass: nTDSDSA
99 cn: NTDS Settings test
100 options: 1
101 instanceType: 4
102 systemFlags: 33554432""", ["relax:0"])
103
104         # urgent replication should be enabled when creation
105         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
106         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
107
108         # urgent replication should NOT be enabled when modifying
109         m = Message()
110         m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
111         m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
112           "options")
113         ldb.modify(m)
114         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
115         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
116
117         # urgent replication should be enabled when deleting
118         self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
119         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
120         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
121
122         self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
123
124
125     def test_crossRef_object(self):
126         '''Test if the urgent replication is activated
127            when handling a crossRef object'''
128         self.ldb.add({
129                       "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
130                       "objectClass": "crossRef",
131                       "cn": "test crossRef",
132                       "dnsRoot": lp.get("realm").lower(),
133                       "instanceType": "4",
134                       "nCName": self.base_dn,
135                       "showInAdvancedViewOnly": "TRUE",
136                       "name": "test crossRef",
137                       "systemFlags": "1"}, ["relax:0"])
138
139         # urgent replication should be enabled when creating
140         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
141         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
142
143         # urgent replication should NOT be enabled when modifying
144         m = Message()
145         m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
146         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
147           "systemFlags")
148         ldb.modify(m)
149         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
150         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
151
152
153         # urgent replication should be enabled when deleting
154         self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
155         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
156         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
157
158
159
160     def test_attributeSchema_object(self):
161         '''Test if the urgent replication is activated
162            when handling an attributeSchema object'''
163
164         try:
165             self.ldb.add_ldif(
166                               """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
167 objectClass: attributeSchema
168 cn: test attributeSchema
169 instanceType: 4
170 isSingleValued: FALSE
171 showInAdvancedViewOnly: FALSE
172 attributeID: 0.9.2342.19200300.100.1.1
173 attributeSyntax: 2.5.5.12
174 adminDisplayName: test attributeSchema
175 adminDescription: test attributeSchema
176 oMSyntax: 64
177 systemOnly: FALSE
178 searchFlags: 8
179 lDAPDisplayName: test attributeSchema
180 name: test attributeSchema""")
181
182             # urgent replication should be enabled when creating
183             res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
184             self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
185
186         except LdbError:
187             print "Not testing urgent replication when creating attributeSchema object ...\n"
188
189         # urgent replication should be enabled when modifying 
190         m = Message()
191         m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
192         m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
193           "lDAPDisplayName")
194         ldb.modify(m)
195         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
196         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
197
198
199     def test_classSchema_object(self):
200         '''Test if the urgent replication is activated
201            when handling a classSchema object'''
202         try:
203             self.ldb.add_ldif(
204                             """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
205 objectClass: classSchema
206 cn: test classSchema
207 instanceType: 4
208 subClassOf: top
209 governsID: 1.2.840.113556.1.5.999
210 rDNAttID: cn
211 showInAdvancedViewOnly: TRUE
212 adminDisplayName: test classSchema
213 adminDescription: test classSchema
214 objectClassCategory: 1
215 lDAPDisplayName: test classSchema
216 name: test classSchema
217 systemOnly: FALSE
218 systemPossSuperiors: dfsConfiguration
219 systemMustContain: msDFS-SchemaMajorVersion
220 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
221  CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
222 systemFlags: 16
223 defaultHidingValue: TRUE""")
224
225             # urgent replication should be enabled when creating
226             res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
227             self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
228
229         except LdbError:
230             print "Not testing urgent replication when creating classSchema object ...\n"
231
232         # urgent replication should be enabled when modifying 
233         m = Message()
234         m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
235         m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
236           "lDAPDisplayName")
237         ldb.modify(m)
238         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
239         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
240
241
242     def test_secret_object(self):
243         '''Test if the urgent replication is activated
244            when handling a secret object'''
245
246         self.ldb.add({
247             "dn": "cn=test secret,cn=System," + self.base_dn,
248             "objectClass":"secret",
249             "cn":"test secret",
250             "name":"test secret",
251             "currentValue":"xxxxxxx"}, ["relax:0"])
252
253         # urgent replication should be enabled when creating
254         res = self.ldb.load_partition_usn(self.base_dn)
255         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
256
257         # urgent replication should be enabled when modifying
258         m = Message()
259         m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
260         m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
261           "currentValue")
262         ldb.modify(m)
263         res = self.ldb.load_partition_usn(self.base_dn)
264         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
265
266         # urgent replication should NOT be enabled when deleting 
267         self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
268         res = self.ldb.load_partition_usn(self.base_dn)
269         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
270
271
272     def test_rIDManager_object(self):
273         '''Test if the urgent replication is activated
274             when handling a rIDManager object'''
275         self.ldb.add_ldif(
276             """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
277 objectClass: rIDManager
278 cn: RID Manager test
279 instanceType: 4
280 showInAdvancedViewOnly: TRUE
281 name: RID Manager test
282 systemFlags: -1946157056
283 isCriticalSystemObject: TRUE
284 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
285
286         # urgent replication should be enabled when creating
287         res = self.ldb.load_partition_usn(self.base_dn)
288         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
289
290         # urgent replication should be enabled when modifying
291         m = Message()
292         m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
293         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
294           "systemFlags")
295         ldb.modify(m)
296         res = self.ldb.load_partition_usn(self.base_dn)
297         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
298
299         # urgent replication should NOT be enabled when deleting 
300         self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
301         res = self.ldb.load_partition_usn(self.base_dn)
302         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
303
304
305     def test_urgent_attributes(self):
306         '''Test if the urgent replication is activated
307             when handling urgent attributes of an object'''
308
309         self.ldb.add({
310             "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
311             "objectclass":"user",
312             "samaccountname":"user UrgAttr test",
313             "userAccountControl":str(dsdb.UF_NORMAL_ACCOUNT),
314             "lockoutTime":"0",
315             "pwdLastSet":"0",
316             "description":"urgent attributes test description"})
317
318         # urgent replication should NOT be enabled when creating
319         res = self.ldb.load_partition_usn(self.base_dn)
320         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
321
322         # urgent replication should be enabled when modifying userAccountControl 
323         m = Message()
324         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
325         m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT+dsdb.UF_SMARTCARD_REQUIRED), FLAG_MOD_REPLACE,
326           "userAccountControl")
327         ldb.modify(m)
328         res = self.ldb.load_partition_usn(self.base_dn)
329         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
330
331         # urgent replication should be enabled when modifying lockoutTime
332         m = Message()
333         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
334         m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
335           "lockoutTime")
336         ldb.modify(m)
337         res = self.ldb.load_partition_usn(self.base_dn)
338         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
339
340         # urgent replication should be enabled when modifying pwdLastSet
341         m = Message()
342         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
343         m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
344           "pwdLastSet")
345         ldb.modify(m)
346         res = self.ldb.load_partition_usn(self.base_dn)
347         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
348
349         # urgent replication should NOT be enabled when modifying a not-urgent
350         # attribute
351         m = Message()
352         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
353         m["description"] = MessageElement("updated urgent attributes test description",
354                                           FLAG_MOD_REPLACE, "description")
355         ldb.modify(m)
356         res = self.ldb.load_partition_usn(self.base_dn)
357         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
358
359         # urgent replication should NOT be enabled when deleting
360         self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
361         res = self.ldb.load_partition_usn(self.base_dn)
362         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
363
364
365 if not "://" in host:
366     if os.path.isfile(host):
367         host = "tdb://%s" % host
368     else:
369         host = "ldap://%s" % host
370
371
372 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp,
373             global_schema=False)
374
375 runner = SubunitTestRunner()
376 rc = 0
377 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
378     rc = 1
379 sys.exit(rc)