de412c6130bb6e7d0ef990cd1e37fb402a40a5c7
[nivanova/samba-autobuild/.git] / source4 / dsdb / tests / python / urgent_replication.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from __future__ import print_function
5 import optparse
6 import sys
7 sys.path.insert(0, "bin/python")
8 import samba
9 from samba.tests.subunitrun import TestProgram, SubunitOptions
10
11 from ldb import (LdbError, ERR_NO_SUCH_OBJECT, Message,
12                  MessageElement, Dn, FLAG_MOD_REPLACE)
13 import samba.tests
14 import samba.dsdb as dsdb
15 import samba.getopt as options
16 import random
17
18 parser = optparse.OptionParser("urgent_replication.py [options] <host>")
19 sambaopts = options.SambaOptions(parser)
20 parser.add_option_group(sambaopts)
21 parser.add_option_group(options.VersionOptions(parser))
22
23 # use command line creds if available
24 credopts = options.CredentialsOptions(parser)
25 parser.add_option_group(credopts)
26 subunitopts = SubunitOptions(parser)
27 parser.add_option_group(subunitopts)
28 opts, args = parser.parse_args()
29
30 if len(args) < 1:
31     parser.print_usage()
32     sys.exit(1)
33
34 host = args[0]
35
36
37 class UrgentReplicationTests(samba.tests.TestCase):
38
39     def delete_force(self, ldb, dn):
40         try:
41             ldb.delete(dn, ["relax:0"])
42         except LdbError as e:
43             (num, _) = e.args
44             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
45
46     def setUp(self):
47         super(UrgentReplicationTests, self).setUp()
48         self.ldb = samba.tests.connect_samdb(host, global_schema=False)
49         self.base_dn = self.ldb.domain_dn()
50
51         print("baseDN: %s\n" % self.base_dn)
52
53     def test_nonurgent_object(self):
54         """Test if the urgent replication is not activated when handling a non urgent object."""
55         self.ldb.add({
56             "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
57             "objectclass": "user",
58             "samaccountname": "nonurgenttest",
59             "description": "nonurgenttest description"})
60
61         # urgent replication should not be enabled when creating
62         res = self.ldb.load_partition_usn(self.base_dn)
63         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
64
65         # urgent replication should not be enabled when modifying
66         m = Message()
67         m.dn = Dn(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
68         m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
69                                           "description")
70         self.ldb.modify(m)
71         res = self.ldb.load_partition_usn(self.base_dn)
72         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
73
74         # urgent replication should not be enabled when deleting
75         self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
76         res = self.ldb.load_partition_usn(self.base_dn)
77         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
78
79     def test_nTDSDSA_object(self):
80         """Test if the urgent replication is activated when handling a nTDSDSA object."""
81         self.ldb.add({
82             "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" %
83             self.ldb.get_config_basedn(),
84             "objectclass": "server",
85             "cn": "test server",
86             "name": "test server",
87             "systemFlags": "50000000"}, ["relax:0"])
88
89         self.ldb.add_ldif(
90             """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
91 objectclass: nTDSDSA
92 cn: NTDS Settings test
93 options: 1
94 instanceType: 4
95 systemFlags: 33554432""", ["relax:0"])
96
97         # urgent replication should be enabled when creation
98         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
99         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
100
101         # urgent replication should NOT be enabled when modifying
102         m = Message()
103         m.dn = Dn(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
104         m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
105                                       "options")
106         self.ldb.modify(m)
107         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
108         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
109
110         # urgent replication should be enabled when deleting
111         self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
112         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
113         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
114
115         self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
116
117     def test_crossRef_object(self):
118         """Test if the urgent replication is activated when handling a crossRef object."""
119         self.ldb.add({
120             "dn": "CN=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn,
121             "objectClass": "crossRef",
122             "cn": "test crossRef",
123             "dnsRoot": self.get_loadparm().get("realm").lower(),
124             "instanceType": "4",
125             "nCName": self.base_dn,
126             "showInAdvancedViewOnly": "TRUE",
127             "name": "test crossRef",
128             "systemFlags": "1"}, ["relax:0"])
129
130         # urgent replication should be enabled when creating
131         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
132         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
133
134         # urgent replication should NOT be enabled when modifying
135         m = Message()
136         m.dn = Dn(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
137         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
138                                           "systemFlags")
139         self.ldb.modify(m)
140         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
141         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
142
143         # urgent replication should be enabled when deleting
144         self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
145         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
146         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
147
148     def test_attributeSchema_object(self):
149         """Test if the urgent replication is activated when handling an attributeSchema object"""
150
151         self.ldb.add_ldif(
152             """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
153 objectClass: attributeSchema
154 cn: test attributeSchema
155 instanceType: 4
156 isSingleValued: FALSE
157 showInAdvancedViewOnly: FALSE
158 attributeID: 1.3.6.1.4.1.7165.4.6.1.4.""" + str(random.randint(1, 100000)) + """
159 attributeSyntax: 2.5.5.12
160 adminDisplayName: test attributeSchema
161 adminDescription: test attributeSchema
162 oMSyntax: 64
163 systemOnly: FALSE
164 searchFlags: 8
165 lDAPDisplayName: testAttributeSchema
166 name: test attributeSchema""")
167
168         # urgent replication should be enabled when creating
169         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
170         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
171
172         # urgent replication should be enabled when modifying
173         m = Message()
174         m.dn = Dn(self.ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
175         m["lDAPDisplayName"] = MessageElement("updatedTestAttributeSchema", FLAG_MOD_REPLACE,
176                                               "lDAPDisplayName")
177         self.ldb.modify(m)
178         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
179         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
180
181     def test_classSchema_object(self):
182         """Test if the urgent replication is activated when handling a classSchema object."""
183         try:
184             self.ldb.add_ldif(
185                             """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
186 objectClass: classSchema
187 cn: test classSchema
188 instanceType: 4
189 subClassOf: top
190 governsId: 1.3.6.1.4.1.7165.4.6.2.4.""" + str(random.randint(1, 100000)) + """
191 rDNAttID: cn
192 showInAdvancedViewOnly: TRUE
193 adminDisplayName: test classSchema
194 adminDescription: test classSchema
195 objectClassCategory: 1
196 lDAPDisplayName: testClassSchema
197 name: test classSchema
198 systemOnly: FALSE
199 systemPossSuperiors: dfsConfiguration
200 systemMustContain: msDFS-SchemaMajorVersion
201 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
202  CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
203 systemFlags: 16
204 defaultHidingValue: TRUE""")
205
206             # urgent replication should be enabled when creating
207             res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
208             self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
209
210         except LdbError:
211             print("Not testing urgent replication when creating classSchema object ...\n")
212
213         # urgent replication should be enabled when modifying 
214         m = Message()
215         m.dn = Dn(self.ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
216         m["lDAPDisplayName"] = MessageElement("updatedTestClassSchema", FLAG_MOD_REPLACE,
217                                               "lDAPDisplayName")
218         self.ldb.modify(m)
219         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
220         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
221
222     def test_secret_object(self):
223         """Test if the urgent replication is activated when handling a secret object."""
224
225         self.ldb.add({
226             "dn": "cn=test secret,cn=System," + self.base_dn,
227             "objectClass": "secret",
228             "cn": "test secret",
229             "name": "test secret",
230             "currentValue": "xxxxxxx"})
231
232         # urgent replication should be enabled when creating
233         res = self.ldb.load_partition_usn(self.base_dn)
234         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
235
236         # urgent replication should be enabled when modifying
237         m = Message()
238         m.dn = Dn(self.ldb, "cn=test secret,cn=System," + self.base_dn)
239         m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
240                                            "currentValue")
241         self.ldb.modify(m)
242         res = self.ldb.load_partition_usn(self.base_dn)
243         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
244
245         # urgent replication should NOT be enabled when deleting
246         self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
247         res = self.ldb.load_partition_usn(self.base_dn)
248         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
249
250     def test_rIDManager_object(self):
251         """Test if the urgent replication is activated when handling a rIDManager object."""
252         self.ldb.add_ldif(
253             """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
254 objectClass: rIDManager
255 cn: RID Manager test
256 instanceType: 4
257 showInAdvancedViewOnly: TRUE
258 name: RID Manager test
259 systemFlags: -1946157056
260 isCriticalSystemObject: TRUE
261 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
262
263         # urgent replication should be enabled when creating
264         res = self.ldb.load_partition_usn(self.base_dn)
265         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
266
267         # urgent replication should be enabled when modifying
268         m = Message()
269         m.dn = Dn(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
270         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
271                                           "systemFlags")
272         self.ldb.modify(m)
273         res = self.ldb.load_partition_usn(self.base_dn)
274         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
275
276         # urgent replication should NOT be enabled when deleting 
277         self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
278         res = self.ldb.load_partition_usn(self.base_dn)
279         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
280
281     def test_urgent_attributes(self):
282         """Test if the urgent replication is activated when handling urgent attributes of an object."""
283
284         self.ldb.add({
285             "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
286             "objectclass": "user",
287             "samaccountname": "user UrgAttr test",
288             "userAccountControl": str(dsdb.UF_NORMAL_ACCOUNT),
289             "lockoutTime": "0",
290             "pwdLastSet": "0",
291             "description": "urgent attributes test description"})
292
293         # urgent replication should NOT be enabled when creating
294         res = self.ldb.load_partition_usn(self.base_dn)
295         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
296
297         # urgent replication should be enabled when modifying userAccountControl
298         m = Message()
299         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
300         m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT + dsdb.UF_DONT_EXPIRE_PASSWD), FLAG_MOD_REPLACE,
301                                                  "userAccountControl")
302         self.ldb.modify(m)
303         res = self.ldb.load_partition_usn(self.base_dn)
304         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
305
306         # urgent replication should be enabled when modifying lockoutTime
307         m = Message()
308         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
309         m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
310                                           "lockoutTime")
311         self.ldb.modify(m)
312         res = self.ldb.load_partition_usn(self.base_dn)
313         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
314
315         # urgent replication should be enabled when modifying pwdLastSet
316         m = Message()
317         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
318         m["pwdLastSet"] = MessageElement("-1", FLAG_MOD_REPLACE,
319                                          "pwdLastSet")
320         self.ldb.modify(m)
321         res = self.ldb.load_partition_usn(self.base_dn)
322         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
323
324         # urgent replication should NOT be enabled when modifying a not-urgent
325         # attribute
326         m = Message()
327         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
328         m["description"] = MessageElement("updated urgent attributes test description",
329                                           FLAG_MOD_REPLACE, "description")
330         self.ldb.modify(m)
331         res = self.ldb.load_partition_usn(self.base_dn)
332         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
333
334         # urgent replication should NOT be enabled when deleting
335         self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
336         res = self.ldb.load_partition_usn(self.base_dn)
337         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
338
339
340 TestProgram(module=__name__, opts=subunitopts)