PEP8: fix E231: missing whitespace after ','
[samba.git] / source4 / dsdb / tests / python / urgent_replication.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from __future__ import print_function
5 import optparse
6 import sys
7 sys.path.insert(0, "bin/python")
8 import samba
9 from samba.tests.subunitrun import TestProgram, SubunitOptions
10
11 from ldb import (LdbError, ERR_NO_SUCH_OBJECT, Message,
12                  MessageElement, Dn, FLAG_MOD_REPLACE)
13 import samba.tests
14 import samba.dsdb as dsdb
15 import samba.getopt as options
16 import random
17
18 parser = optparse.OptionParser("urgent_replication.py [options] <host>")
19 sambaopts = options.SambaOptions(parser)
20 parser.add_option_group(sambaopts)
21 parser.add_option_group(options.VersionOptions(parser))
22
23 # use command line creds if available
24 credopts = options.CredentialsOptions(parser)
25 parser.add_option_group(credopts)
26 subunitopts = SubunitOptions(parser)
27 parser.add_option_group(subunitopts)
28 opts, args = parser.parse_args()
29
30 if len(args) < 1:
31     parser.print_usage()
32     sys.exit(1)
33
34 host = args[0]
35
36
37 class UrgentReplicationTests(samba.tests.TestCase):
38
39     def delete_force(self, ldb, dn):
40         try:
41             ldb.delete(dn, ["relax:0"])
42         except LdbError as e:
43             (num, _) = e.args
44             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
45
46     def setUp(self):
47         super(UrgentReplicationTests, self).setUp()
48         self.ldb = samba.tests.connect_samdb(host, global_schema=False)
49         self.base_dn = self.ldb.domain_dn()
50
51         print("baseDN: %s\n" % self.base_dn)
52
53     def test_nonurgent_object(self):
54         """Test if the urgent replication is not activated when handling a non urgent object."""
55         self.ldb.add({
56             "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
57             "objectclass": "user",
58             "samaccountname": "nonurgenttest",
59             "description": "nonurgenttest description"})
60
61         # urgent replication should not be enabled when creating
62         res = self.ldb.load_partition_usn(self.base_dn)
63         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
64
65         # urgent replication should not be enabled when modifying
66         m = Message()
67         m.dn = Dn(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
68         m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
69                                           "description")
70         self.ldb.modify(m)
71         res = self.ldb.load_partition_usn(self.base_dn)
72         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
73
74         # urgent replication should not be enabled when deleting
75         self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
76         res = self.ldb.load_partition_usn(self.base_dn)
77         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
78
79     def test_nTDSDSA_object(self):
80         """Test if the urgent replication is activated when handling a nTDSDSA object."""
81         self.ldb.add({
82             "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" %
83             self.ldb.get_config_basedn(),
84             "objectclass": "server",
85             "cn": "test server",
86             "name": "test server",
87             "systemFlags": "50000000"}, ["relax:0"])
88
89         self.ldb.add_ldif(
90             """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
91 objectclass: nTDSDSA
92 cn: NTDS Settings test
93 options: 1
94 instanceType: 4
95 systemFlags: 33554432""", ["relax:0"])
96
97         # urgent replication should be enabled when creation
98         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
99         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
100
101         # urgent replication should NOT be enabled when modifying
102         m = Message()
103         m.dn = Dn(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
104         m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
105                                       "options")
106         self.ldb.modify(m)
107         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
108         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
109
110         # urgent replication should be enabled when deleting
111         self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
112         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
113         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
114
115         self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
116
117     def test_crossRef_object(self):
118         """Test if the urgent replication is activated when handling a crossRef object."""
119         self.ldb.add({
120             "dn": "CN=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn,
121             "objectClass": "crossRef",
122             "cn": "test crossRef",
123             "dnsRoot": self.get_loadparm().get("realm").lower(),
124             "instanceType": "4",
125             "nCName": self.base_dn,
126             "showInAdvancedViewOnly": "TRUE",
127             "name": "test crossRef",
128             "systemFlags": "1"}, ["relax:0"])
129
130         # urgent replication should be enabled when creating
131         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
132         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
133
134         # urgent replication should NOT be enabled when modifying
135         m = Message()
136         m.dn = Dn(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
137         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
138                                           "systemFlags")
139         self.ldb.modify(m)
140         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
141         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
142
143
144         # urgent replication should be enabled when deleting
145         self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
146         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
147         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
148
149     def test_attributeSchema_object(self):
150         """Test if the urgent replication is activated when handling an attributeSchema object"""
151
152         self.ldb.add_ldif(
153             """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
154 objectClass: attributeSchema
155 cn: test attributeSchema
156 instanceType: 4
157 isSingleValued: FALSE
158 showInAdvancedViewOnly: FALSE
159 attributeID: 1.3.6.1.4.1.7165.4.6.1.4.""" + str(random.randint(1, 100000)) + """
160 attributeSyntax: 2.5.5.12
161 adminDisplayName: test attributeSchema
162 adminDescription: test attributeSchema
163 oMSyntax: 64
164 systemOnly: FALSE
165 searchFlags: 8
166 lDAPDisplayName: testAttributeSchema
167 name: test attributeSchema""")
168
169         # urgent replication should be enabled when creating
170         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
171         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
172
173         # urgent replication should be enabled when modifying
174         m = Message()
175         m.dn = Dn(self.ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
176         m["lDAPDisplayName"] = MessageElement("updatedTestAttributeSchema", FLAG_MOD_REPLACE,
177                                               "lDAPDisplayName")
178         self.ldb.modify(m)
179         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
180         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
181
182     def test_classSchema_object(self):
183         """Test if the urgent replication is activated when handling a classSchema object."""
184         try:
185             self.ldb.add_ldif(
186                             """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
187 objectClass: classSchema
188 cn: test classSchema
189 instanceType: 4
190 subClassOf: top
191 governsId: 1.3.6.1.4.1.7165.4.6.2.4.""" + str(random.randint(1, 100000)) + """
192 rDNAttID: cn
193 showInAdvancedViewOnly: TRUE
194 adminDisplayName: test classSchema
195 adminDescription: test classSchema
196 objectClassCategory: 1
197 lDAPDisplayName: testClassSchema
198 name: test classSchema
199 systemOnly: FALSE
200 systemPossSuperiors: dfsConfiguration
201 systemMustContain: msDFS-SchemaMajorVersion
202 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
203  CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
204 systemFlags: 16
205 defaultHidingValue: TRUE""")
206
207             # urgent replication should be enabled when creating
208             res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
209             self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
210
211         except LdbError:
212             print("Not testing urgent replication when creating classSchema object ...\n")
213
214         # urgent replication should be enabled when modifying 
215         m = Message()
216         m.dn = Dn(self.ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
217         m["lDAPDisplayName"] = MessageElement("updatedTestClassSchema", FLAG_MOD_REPLACE,
218                                               "lDAPDisplayName")
219         self.ldb.modify(m)
220         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
221         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
222
223     def test_secret_object(self):
224         """Test if the urgent replication is activated when handling a secret object."""
225
226         self.ldb.add({
227             "dn": "cn=test secret,cn=System," + self.base_dn,
228             "objectClass": "secret",
229             "cn": "test secret",
230             "name": "test secret",
231             "currentValue": "xxxxxxx"})
232
233         # urgent replication should be enabled when creating
234         res = self.ldb.load_partition_usn(self.base_dn)
235         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
236
237         # urgent replication should be enabled when modifying
238         m = Message()
239         m.dn = Dn(self.ldb, "cn=test secret,cn=System," + self.base_dn)
240         m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
241                                            "currentValue")
242         self.ldb.modify(m)
243         res = self.ldb.load_partition_usn(self.base_dn)
244         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
245
246         # urgent replication should NOT be enabled when deleting
247         self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
248         res = self.ldb.load_partition_usn(self.base_dn)
249         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
250
251     def test_rIDManager_object(self):
252         """Test if the urgent replication is activated when handling a rIDManager object."""
253         self.ldb.add_ldif(
254             """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
255 objectClass: rIDManager
256 cn: RID Manager test
257 instanceType: 4
258 showInAdvancedViewOnly: TRUE
259 name: RID Manager test
260 systemFlags: -1946157056
261 isCriticalSystemObject: TRUE
262 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
263
264         # urgent replication should be enabled when creating
265         res = self.ldb.load_partition_usn(self.base_dn)
266         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
267
268         # urgent replication should be enabled when modifying
269         m = Message()
270         m.dn = Dn(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
271         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
272                                           "systemFlags")
273         self.ldb.modify(m)
274         res = self.ldb.load_partition_usn(self.base_dn)
275         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
276
277         # urgent replication should NOT be enabled when deleting 
278         self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
279         res = self.ldb.load_partition_usn(self.base_dn)
280         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
281
282     def test_urgent_attributes(self):
283         """Test if the urgent replication is activated when handling urgent attributes of an object."""
284
285         self.ldb.add({
286             "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
287             "objectclass": "user",
288             "samaccountname": "user UrgAttr test",
289             "userAccountControl": str(dsdb.UF_NORMAL_ACCOUNT),
290             "lockoutTime": "0",
291             "pwdLastSet": "0",
292             "description": "urgent attributes test description"})
293
294         # urgent replication should NOT be enabled when creating
295         res = self.ldb.load_partition_usn(self.base_dn)
296         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
297
298         # urgent replication should be enabled when modifying userAccountControl
299         m = Message()
300         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
301         m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT + dsdb.UF_DONT_EXPIRE_PASSWD), FLAG_MOD_REPLACE,
302                                                  "userAccountControl")
303         self.ldb.modify(m)
304         res = self.ldb.load_partition_usn(self.base_dn)
305         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
306
307         # urgent replication should be enabled when modifying lockoutTime
308         m = Message()
309         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
310         m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
311                                           "lockoutTime")
312         self.ldb.modify(m)
313         res = self.ldb.load_partition_usn(self.base_dn)
314         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
315
316         # urgent replication should be enabled when modifying pwdLastSet
317         m = Message()
318         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
319         m["pwdLastSet"] = MessageElement("-1", FLAG_MOD_REPLACE,
320                                          "pwdLastSet")
321         self.ldb.modify(m)
322         res = self.ldb.load_partition_usn(self.base_dn)
323         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
324
325         # urgent replication should NOT be enabled when modifying a not-urgent
326         # attribute
327         m = Message()
328         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
329         m["description"] = MessageElement("updated urgent attributes test description",
330                                           FLAG_MOD_REPLACE, "description")
331         self.ldb.modify(m)
332         res = self.ldb.load_partition_usn(self.base_dn)
333         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
334
335         # urgent replication should NOT be enabled when deleting
336         self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
337         res = self.ldb.load_partition_usn(self.base_dn)
338         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
339
340
341 TestProgram(module=__name__, opts=subunitopts)