s4-test: minor fixes to urgent_replication.py
[ira/wip.git] / source4 / lib / ldb / tests / python / urgent_replication.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 # This is a port of the original in testprogs/ejs/ldap.js
4
5 import getopt
6 import optparse
7 import sys
8 import time
9 import random
10 import base64
11 import os
12
13 sys.path.append("bin/python")
14 sys.path.append("../lib/subunit/python")
15
16 import samba.getopt as options
17
18 from samba.auth import system_session
19 from ldb import SCOPE_BASE, LdbError
20 from ldb import ERR_NO_SUCH_OBJECT
21 from ldb import Message, MessageElement, Dn
22 from ldb import FLAG_MOD_REPLACE
23 from samba import Ldb
24 from samba import glue
25
26 from subunit.run import SubunitTestRunner
27 import unittest
28
29 from samba.ndr import ndr_pack, ndr_unpack
30 from samba.dcerpc import security
31
32 parser = optparse.OptionParser("urgent_replication [options] <host>")
33 sambaopts = options.SambaOptions(parser)
34 parser.add_option_group(sambaopts)
35 parser.add_option_group(options.VersionOptions(parser))
36 # use command line creds if available
37 credopts = options.CredentialsOptions(parser)
38 parser.add_option_group(credopts)
39 opts, args = parser.parse_args()
40
41 if len(args) < 1:
42     parser.print_usage()
43     sys.exit(1)
44
45 host = args[0]
46
47 lp = sambaopts.get_loadparm()
48 creds = credopts.get_credentials(lp)
49
50 class UrgentReplicationTests(unittest.TestCase):
51
52     def delete_force(self, ldb, dn):
53         try:
54             ldb.delete(dn)
55         except LdbError, (num, _):
56             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
57
58     def find_basedn(self, ldb):
59         res = ldb.search(base="", expression="", scope=SCOPE_BASE,
60                          attrs=["defaultNamingContext"])
61         self.assertEquals(len(res), 1)
62         return res[0]["defaultNamingContext"][0]
63
64     def setUp(self):
65         self.ldb = ldb
66         self.base_dn = self.find_basedn(ldb)
67
68         print "baseDN: %s\n" % self.base_dn
69
70     def test_nonurgent_object(self):
71         '''Test if the urgent replication is not activated
72            when handling a non urgent object'''
73         self.ldb.add({
74             "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
75             "objectclass":"user",
76             "samaccountname":"nonurgenttest",
77             "description":"nonurgenttest description"});
78
79         ''' urgent replication should not be enabled when creating '''
80         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
81         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
82
83         ''' urgent replication should not be enabled when modifying '''
84         m = Message()
85         m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
86         m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
87           "description")
88         ldb.modify(m)
89         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
90         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
91
92         ''' urgent replication should not be enabled when deleting '''
93         self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
94         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
95         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
96
97
98     def test_nTDSDSA_object(self):
99         '''Test if the urgent replication is activated
100            when handling a nTDSDSA object'''
101         self.ldb.add({
102             "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
103             "objectclass":"server",
104             "cn":"test server",
105             "name":"test server",
106             "systemFlags":"50000000"});
107
108         self.ldb.add_ldif(
109             """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
110 objectclass: nTDSDSA
111 cn: NTDS Settings test
112 options: 1
113 instanceType: 4
114 systemFlags: 33554432""", ["relax:0"]);
115
116         ''' urgent replication should be enabled when creation '''
117         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
118         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
119
120         ''' urgent replication should NOT be enabled when modifying '''
121         m = Message()
122         m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
123         m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
124           "options")
125         ldb.modify(m)
126         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
127         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
128
129         ''' urgent replication should be enabled when deleting '''
130         self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
131         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
132         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
133
134         self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
135
136
137     def test_crossRef_object(self):
138         '''Test if the urgent replication is activated
139            when handling a crossRef object'''
140         self.ldb.add({
141                       "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
142                       "objectClass": "crossRef",
143                       "cn": "test crossRef",
144                       "instanceType": "4",
145                       "nCName": self.base_dn,
146                       "showInAdvancedViewOnly": "TRUE",
147                       "name": "test crossRef",
148                       "systemFlags": "1"});
149
150         ''' urgent replication should be enabled when creating '''
151         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
152         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
153
154         ''' urgent replication should NOT be enabled when modifying '''
155         m = Message()
156         m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
157         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
158           "systemFlags")
159         ldb.modify(m)
160         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
161         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
162
163
164         ''' urgent replication should be enabled when deleting '''
165         self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
166         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
167         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
168
169
170
171     def test_attributeSchema_object(self):
172         '''Test if the urgent replication is activated
173            when handling an attributeSchema object'''
174
175         try:
176             self.ldb.add_ldif(
177                               """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
178 objectClass: attributeSchema
179 cn: test attributeSchema
180 instanceType: 4
181 isSingleValued: FALSE
182 showInAdvancedViewOnly: FALSE
183 attributeID: 0.9.2342.19200300.100.1.1
184 attributeSyntax: 2.5.5.12
185 adminDisplayName: test attributeSchema
186 adminDescription: test attributeSchema
187 oMSyntax: 64
188 systemOnly: FALSE
189 searchFlags: 8
190 lDAPDisplayName: test attributeSchema
191 name: test attributeSchema
192 systemFlags: 0""", ["relax:0"]);
193
194             ''' urgent replication should be enabled when creating '''
195             res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
196             self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
197
198         except LdbError:
199             print "Not testing urgent replication when creating attributeSchema object ...\n"
200
201         ''' urgent replication should be enabled when modifying '''
202         m = Message()
203         m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
204         m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
205           "lDAPDisplayName")
206         ldb.modify(m)
207         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
208         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
209
210
211     def test_classSchema_object(self):
212         '''Test if the urgent replication is activated
213            when handling a classSchema object'''
214         try:
215             self.ldb.add_ldif(
216                             """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
217 objectClass: classSchema
218 cn: test classSchema
219 instanceType: 4
220 subClassOf: top
221 governsID: 1.2.840.113556.1.5.999
222 rDNAttID: cn
223 showInAdvancedViewOnly: TRUE
224 adminDisplayName: test classSchema
225 adminDescription: test classSchema
226 objectClassCategory: 1
227 lDAPDisplayName: test classSchema
228 name: test classSchema
229 systemOnly: FALSE
230 systemPossSuperiors: dfsConfiguration
231 systemMustContain: msDFS-SchemaMajorVersion
232 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
233  CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
234 systemFlags: 16
235 defaultHidingValue: TRUE""", ["relax:0"]);
236
237             ''' urgent replication should be enabled when creating '''
238             res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
239             self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
240
241         except LdbError:
242             print "Not testing urgent replication when creating classSchema object ...\n"
243
244         ''' urgent replication should be enabled when modifying '''
245         m = Message()
246         m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
247         m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
248           "lDAPDisplayName")
249         ldb.modify(m)
250         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
251         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
252
253
254     def test_secret_object(self):
255
256         '''Test if the urgent replication is activated
257            when handling a secret object'''
258
259         self.ldb.add({
260             "dn": "cn=test secret,cn=System," + self.base_dn,
261             "objectClass":"secret",
262             "cn":"test secret",
263             "name":"test secret",
264             "currentValue":"xxxxxxx"});
265
266
267         ''' urgent replication should be enabled when creationg '''
268         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
269         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
270
271         ''' urgent replication should be enabled when modifying '''
272         m = Message()
273         m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
274         m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
275           "currentValue")
276         ldb.modify(m)
277         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
278         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
279
280         ''' urgent replication should NOT be enabled when deleting '''
281         self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
282         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
283         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
284
285
286     def test_rIDManager_object(self):
287         '''Test if the urgent replication is activated
288             when handling a rIDManager object'''
289         self.ldb.add_ldif(
290             """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
291 objectClass: rIDManager
292 cn: RID Manager test
293 instanceType: 4
294 showInAdvancedViewOnly: TRUE
295 name: RID Manager test
296 systemFlags: -1946157056
297 isCriticalSystemObject: TRUE
298 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
299
300         ''' urgent replication should be enabled when creating '''
301         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
302         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
303
304         ''' urgent replication should be enabled when modifying '''
305         m = Message()
306         m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
307         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
308           "systemFlags")
309         ldb.modify(m)
310         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
311         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
312
313         ''' urgent replication should NOT be enabled when deleting '''
314         self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
315         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
316         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
317
318
319     def test_urgent_attributes(self):
320         '''Test if the urgent replication is activated
321             when handling urgent attributes of an object'''
322
323         self.ldb.add({
324             "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
325             "objectclass":"user",
326             "samaccountname":"user UrgAttr test",
327             "userAccountControl":"1",
328             "lockoutTime":"0",
329             "pwdLastSet":"0",
330             "description":"urgent attributes test description"});
331
332         ''' urgent replication should NOT be enabled when creating '''
333         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
334         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
335
336         ''' urgent replication should be enabled when modifying userAccountControl '''
337         m = Message()
338         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
339         m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
340           "userAccountControl")
341         ldb.modify(m)
342         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
343         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
344
345         ''' urgent replication should be enabled when modifying lockoutTime '''
346         m = Message()
347         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
348         m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
349           "lockoutTime")
350         ldb.modify(m)
351         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
352         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
353
354         ''' urgent replication should be enabled when modifying pwdLastSet '''
355         m = Message()
356         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
357         m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
358           "pwdLastSet")
359         ldb.modify(m)
360         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
361         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
362
363         ''' urgent replication should NOT be enabled when modifying a not-urgent attribute '''
364         m = Message()
365         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
366         m["description"] = MessageElement("updated urgent attributes test description",
367                                           FLAG_MOD_REPLACE, "description")
368         ldb.modify(m)
369         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
370         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
371
372         ''' urgent replication should NOT be enabled when deleting '''
373         self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
374         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
375         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
376
377
378 if not "://" in host:
379     if os.path.isfile(host):
380         host = "tdb://%s" % host
381     else:
382         host = "ldap://%s" % host
383
384
385 ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp)
386
387 runner = SubunitTestRunner()
388 rc = 0
389 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
390     rc = 1
391 sys.exit(rc)