d0a63a11467e864c00156bcd3be59e4ba65ed814
[ira/wip.git] / source4 / lib / ldb / tests / python / urgent_replication.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 # This is a port of the original in testprogs/ejs/ldap.js
4
5 import getopt
6 import optparse
7 import sys
8 import time
9 import random
10 import base64
11 import os
12
13 sys.path.append("bin/python")
14 sys.path.append("../lib/subunit/python")
15
16 import samba.getopt as options
17
18 from samba.auth import system_session
19 from ldb import SCOPE_BASE, LdbError
20 from ldb import ERR_NO_SUCH_OBJECT
21 from ldb import Message, MessageElement, Dn
22 from ldb import FLAG_MOD_REPLACE
23 from samba import Ldb
24 from samba import glue
25
26 from subunit.run import SubunitTestRunner
27 import unittest
28
29 from samba.ndr import ndr_pack, ndr_unpack
30 from samba.dcerpc import security
31
32 parser = optparse.OptionParser("ldap [options] <host>")
33 sambaopts = options.SambaOptions(parser)
34 parser.add_option_group(sambaopts)
35 parser.add_option_group(options.VersionOptions(parser))
36 # use command line creds if available
37 credopts = options.CredentialsOptions(parser)
38 parser.add_option_group(credopts)
39 opts, args = parser.parse_args()
40
41 if len(args) < 1:
42     parser.print_usage()
43     sys.exit(1)
44
45 host = args[0]
46
47 lp = sambaopts.get_loadparm()
48 creds = credopts.get_credentials(lp)
49
50 class UrgentReplicationTests(unittest.TestCase):
51
52     def delete_force(self, ldb, dn):
53         try:
54             ldb.delete(dn)
55         except LdbError, (num, _):
56             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
57
58     def find_basedn(self, ldb):
59         res = ldb.search(base="", expression="", scope=SCOPE_BASE,
60                          attrs=["defaultNamingContext"])
61         self.assertEquals(len(res), 1)
62         return res[0]["defaultNamingContext"][0]
63
64     def find_configurationdn(self, ldb):
65         res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["configurationNamingContext"])
66         self.assertEquals(len(res), 1)
67         return res[0]["configurationNamingContext"][0]
68
69     def find_schemadn(self, ldb):
70         res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["schemaNamingContext"])
71         self.assertEquals(len(res), 1)
72         return res[0]["schemaNamingContext"][0]
73
74     def find_domain_sid(self):
75         res = self.ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
76         return ndr_unpack( security.dom_sid,res[0]["objectSid"][0])
77
78     def setUp(self):
79         self.ldb = ldb
80         self.gc_ldb = gc_ldb
81         self.base_dn = self.find_basedn(ldb)
82         self.configuration_dn = self.find_configurationdn(ldb)
83         self.schema_dn = self.find_schemadn(ldb)
84         self.domain_sid = self.find_domain_sid()
85
86         print "baseDN: %s\n" % self.base_dn
87
88     def test_nonurgent_object(self):
89         '''Test if the urgent replication is not activated
90            when handling a non urgent object'''
91         self.ldb.add({
92             "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
93             "objectclass":"user",
94             "samaccountname":"nonurgenttest",
95             "description":"nonurgenttest description"});
96
97         ''' urgent replication shouldn't be enabled when creating '''
98         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
99         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
100
101         ''' urgent replication shouldn't be enabled when modifying '''
102         m = Message()
103         m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
104         m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
105           "description")
106         ldb.modify(m)
107         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
108         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
109
110         ''' urgent replication shouldn't be enabled when deleting '''
111         self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
112         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
113         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
114
115
116     def test_nTDSDSA_object(self):
117         '''Test if the urgent replication is activated
118            when handling a nTDSDSA object'''
119         self.ldb.add({
120             "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
121             "objectclass":"server",
122             "cn":"test server",
123             "name":"test server",
124             "systemFlags":"50000000"});
125
126         self.ldb.add_ldif(
127             """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
128 objectclass: nTDSDSA
129 cn: NTDS Settings test
130 options: 1
131 instanceType: 4
132 systemFlags: 33554432""", ["relax:0"]);
133
134         ''' urgent replication should be enabled when creation '''
135         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
136         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
137
138         ''' urgent replication should NOT be enabled when modifying '''
139         m = Message()
140         m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
141         m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
142           "options")
143         ldb.modify(m)
144         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
145         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
146
147         ''' urgent replication should be enabled when deleting '''
148         self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
149         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
150         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
151
152         self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
153
154
155     def test_crossRef_object(self):
156         '''Test if the urgent replication is activated
157            when handling a crossRef object'''
158         self.ldb.add({
159                       "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
160                       "objectClass": "crossRef",
161                       "cn": "test crossRef",
162                       "instanceType": "4",
163                       "nCName": self.base_dn,
164                       "showInAdvancedViewOnly": "TRUE",
165                       "name": "test crossRef",
166                       "systemFlags": "1"});
167
168         ''' urgent replication should be enabled when creating '''
169         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
170         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
171
172         ''' urgent replication should NOT be enabled when modifying '''
173         m = Message()
174         m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
175         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
176           "systemFlags")
177         ldb.modify(m)
178         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
179         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
180
181
182         ''' urgent replication should be enabled when deleting '''
183         self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
184         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
185         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
186
187
188
189     def test_attributeSchema_object(self):
190         '''Test if the urgent replication is activated
191            when handling an attributeSchema object'''
192
193         try:
194             self.ldb.add_ldif(
195                               """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
196 objectClass: attributeSchema
197 cn: test attributeSchema
198 instanceType: 4
199 isSingleValued: FALSE
200 showInAdvancedViewOnly: FALSE
201 attributeID: 0.9.2342.19200300.100.1.1
202 attributeSyntax: 2.5.5.12
203 adminDisplayName: test attributeSchema
204 adminDescription: test attributeSchema
205 oMSyntax: 64
206 systemOnly: FALSE
207 searchFlags: 8
208 lDAPDisplayName: test attributeSchema
209 name: test attributeSchema
210 systemFlags: 0""", ["relax:0"]);
211
212             ''' urgent replication should be enabled when creating '''
213             res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
214             self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
215
216         except LdbError:
217             print "Not testing urgent replication when creating attributeSchema object ...\n"
218
219         ''' urgent replication should be enabled when modifying '''
220         m = Message()
221         m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
222         m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
223           "lDAPDisplayName")
224         ldb.modify(m)
225         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
226         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
227
228
229     def test_classSchema_object(self):
230         '''Test if the urgent replication is activated
231            when handling a classSchema object'''
232         try:
233             self.ldb.add_ldif(
234                             """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
235 objectClass: classSchema
236 cn: test classSchema
237 instanceType: 4
238 subClassOf: top
239 governsID: 1.2.840.113556.1.5.999
240 rDNAttID: cn
241 showInAdvancedViewOnly: TRUE
242 adminDisplayName: test classSchema
243 adminDescription: test classSchema
244 objectClassCategory: 1
245 lDAPDisplayName: test classSchema
246 name: test classSchema
247 systemOnly: FALSE
248 systemPossSuperiors: dfsConfiguration
249 systemMustContain: msDFS-SchemaMajorVersion
250 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
251  CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
252 systemFlags: 16
253 defaultHidingValue: TRUE""", ["relax:0"]);
254
255             ''' urgent replication should be enabled when creating '''
256             res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
257             self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
258
259         except LdbError:
260             print "Not testing urgent replication when creating classSchema object ...\n"
261
262         ''' urgent replication should be enabled when modifying '''
263         m = Message()
264         m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
265         m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
266           "lDAPDisplayName")
267         ldb.modify(m)
268         res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
269         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
270
271
272     def test_secret_object(self):
273
274         '''Test if the urgent replication is activated
275            when handling a secret object'''
276
277         self.ldb.add({
278             "dn": "cn=test secret,cn=System," + self.base_dn,
279             "objectClass":"secret",
280             "cn":"test secret",
281             "name":"test secret",
282             "currentValue":"xxxxxxx"});
283
284
285         ''' urgent replication should be enabled when creationg '''
286         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
287         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
288
289         ''' urgent replication should be enabled when modifying '''
290         m = Message()
291         m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
292         m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
293           "currentValue")
294         ldb.modify(m)
295         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
296         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
297
298         ''' urgent replication should NOT be enabled when deleting '''
299         self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
300         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
301         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
302
303
304     def test_rIDManager_object(self):
305         '''Test if the urgent replication is activated
306             when handling a rIDManager object'''
307         self.ldb.add_ldif(
308             """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
309 objectClass: rIDManager
310 cn: RID Manager test
311 instanceType: 4
312 showInAdvancedViewOnly: TRUE
313 name: RID Manager test
314 systemFlags: -1946157056
315 isCriticalSystemObject: TRUE
316 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
317
318         ''' urgent replication should be enabled when creating '''
319         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
320         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
321
322         ''' urgent replication should be enabled when modifying '''
323         m = Message()
324         m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
325         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
326           "systemFlags")
327         ldb.modify(m)
328         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
329         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
330
331         ''' urgent replication should NOT be enabled when deleting '''
332         self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
333         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
334         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
335
336
337     def test_urgent_attributes(self):
338         '''Test if the urgent replication is activated
339             when handling urgent attributes of an object'''
340
341         self.ldb.add({
342             "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
343             "objectclass":"user",
344             "samaccountname":"user UrgAttr test",
345             "userAccountControl":"1",
346             "lockoutTime":"0",
347             "pwdLastSet":"0",
348             "description":"urgent attributes test description"});
349
350         ''' urgent replication should NOT be enabled when creating '''
351         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
352         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
353
354         ''' urgent replication should be enabled when modifying userAccountControl '''
355         m = Message()
356         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
357         m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
358           "userAccountControl")
359         ldb.modify(m)
360         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
361         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
362
363         ''' urgent replication should be enabled when modifying lockoutTime '''
364         m = Message()
365         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
366         m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
367           "lockoutTime")
368         ldb.modify(m)
369         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
370         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
371
372         ''' urgent replication should be enabled when modifying pwdLastSet '''
373         m = Message()
374         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
375         m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
376           "pwdLastSet")
377         ldb.modify(m)
378         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
379         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
380
381         ''' urgent replication should NOT be enabled when modifying a not-urgent attribute '''
382         m = Message()
383         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
384         m["description"] = MessageElement("updated urgent attributes test description",
385                                           FLAG_MOD_REPLACE, "description")
386         ldb.modify(m)
387         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
388         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
389
390         ''' urgent replication should NOT be enabled when deleting '''
391         self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
392         res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
393         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
394
395
396 if not "://" in host:
397     if os.path.isfile(host):
398         host = "tdb://%s" % host
399     else:
400         host = "ldap://%s" % host
401
402
403 ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp)
404 if not "tdb://" in host:
405     gc_ldb = Ldb("%s:3268" % host, credentials=creds,
406                  session_info=system_session(), lp=lp)
407 else:
408     gc_ldb = None
409
410 runner = SubunitTestRunner()
411 rc = 0
412 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
413     rc = 1
414 sys.exit(rc)