PEP8: fix E303: too many blank lines (2)
[sfrench/samba-autobuild/.git] / source4 / dsdb / tests / python / rodc.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3
4 from __future__ import print_function
5 import optparse
6 import sys
7 import os
8 import base64
9 import random
10 import re
11 import uuid
12
13 sys.path.insert(0, "bin/python")
14 import samba
15 from samba.tests.subunitrun import SubunitOptions, TestProgram
16
17 import samba.getopt as options
18
19 from samba.auth import system_session
20 import ldb
21 from samba.samdb import SamDB
22 from samba.ndr import ndr_pack, ndr_unpack
23 from samba.dcerpc import drsblobs
24
25 import time
26
27
28 class RodcTestException(Exception):
29     pass
30
31
32 class RodcTests(samba.tests.TestCase):
33
34     def setUp(self):
35         super(RodcTests, self).setUp()
36         self.samdb = SamDB(HOST, credentials=CREDS,
37                            session_info=system_session(LP), lp=LP)
38
39         self.base_dn = self.samdb.domain_dn()
40
41         root = self.samdb.search(base='', scope=ldb.SCOPE_BASE,
42                                  attrs=['dsServiceName'])
43         self.service = root[0]['dsServiceName'][0]
44         self.tag = uuid.uuid4().hex
45
46     def test_add_replicated_objects(self):
47         for o in (
48                 {
49                     'dn': "ou=%s1,%s" % (self.tag, self.base_dn),
50                     "objectclass": "organizationalUnit"
51                 },
52                 {
53                     'dn': "cn=%s2,%s" % (self.tag, self.base_dn),
54                     "objectclass": "user"
55                 },
56                 {
57                     'dn': "cn=%s3,%s" % (self.tag, self.base_dn),
58                     "objectclass": "group"
59                 },
60                 {
61                     'dn': "cn=%s4,%s" % (self.tag, self.service),
62                     "objectclass": "NTDSConnection",
63                     'enabledConnection': 'TRUE',
64                     'fromServer': self.base_dn,
65                     'options': '0'
66                 },
67         ):
68             try:
69                 self.samdb.add(o)
70                 self.fail("Failed to fail to add %s" % o['dn'])
71             except ldb.LdbError as e:
72                 (ecode, emsg) = e.args
73                 if ecode != ldb.ERR_REFERRAL:
74                     print(emsg)
75                     self.fail("Adding %s: ldb error: %s %s, wanted referral" %
76                               (o['dn'], ecode, emsg))
77                 else:
78                     m = re.search(r'(ldap://[^>]+)>', emsg)
79                     if m is None:
80                         self.fail("referral seems not to refer to anything")
81                     address = m.group(1)
82
83                     try:
84                         tmpdb = SamDB(address, credentials=CREDS,
85                                       session_info=system_session(LP), lp=LP)
86                         tmpdb.add(o)
87                         tmpdb.delete(o['dn'])
88                     except ldb.LdbError as e:
89                         self.fail("couldn't modify referred location %s" %
90                                   address)
91
92                     if address.lower().startswith(self.samdb.domain_dns_name()):
93                         self.fail("referral address did not give a specific DC")
94
95     def test_modify_replicated_attributes(self):
96         # some timestamp ones
97         dn = 'CN=Guest,CN=Users,' + self.base_dn
98         value = 'hallooo'
99         for attr in ['carLicense', 'middleName']:
100             msg = ldb.Message()
101             msg.dn = ldb.Dn(self.samdb, dn)
102             msg[attr] = ldb.MessageElement(value,
103                                            ldb.FLAG_MOD_REPLACE,
104                                            attr)
105             try:
106                 self.samdb.modify(msg)
107                 self.fail("Failed to fail to modify %s %s" % (dn, attr))
108             except ldb.LdbError as e1:
109                 (ecode, emsg) = e1.args
110                 if ecode != ldb.ERR_REFERRAL:
111                     self.fail("Failed to REFER when trying to modify %s %s" %
112                               (dn, attr))
113                 else:
114                     m = re.search(r'(ldap://[^>]+)>', emsg)
115                     if m is None:
116                         self.fail("referral seems not to refer to anything")
117                     address = m.group(1)
118
119                     try:
120                         tmpdb = SamDB(address, credentials=CREDS,
121                                       session_info=system_session(LP), lp=LP)
122                         tmpdb.modify(msg)
123                     except ldb.LdbError as e:
124                         self.fail("couldn't modify referred location %s" %
125                                   address)
126
127                     if address.lower().startswith(self.samdb.domain_dns_name()):
128                         self.fail("referral address did not give a specific DC")
129
130     def test_modify_nonreplicated_attributes(self):
131         # some timestamp ones
132         dn = 'CN=Guest,CN=Users,' + self.base_dn
133         value = '123456789'
134         for attr in ['badPwdCount', 'lastLogon', 'lastLogoff']:
135             m = ldb.Message()
136             m.dn = ldb.Dn(self.samdb, dn)
137             m[attr] = ldb.MessageElement(value,
138                                          ldb.FLAG_MOD_REPLACE,
139                                          attr)
140             # Windows refers these ones even though they are non-replicated
141             try:
142                 self.samdb.modify(m)
143                 self.fail("Failed to fail to modify %s %s" % (dn, attr))
144             except ldb.LdbError as e2:
145                 (ecode, emsg) = e2.args
146                 if ecode != ldb.ERR_REFERRAL:
147                     self.fail("Failed to REFER when trying to modify %s %s" %
148                               (dn, attr))
149                 else:
150                     m = re.search(r'(ldap://[^>]+)>', emsg)
151                     if m is None:
152                         self.fail("referral seems not to refer to anything")
153                     address = m.group(1)
154
155                     if address.lower().startswith(self.samdb.domain_dns_name()):
156                         self.fail("referral address did not give a specific DC")
157
158     def test_modify_nonreplicated_reps_attributes(self):
159         # some timestamp ones
160         dn = self.base_dn
161
162         m = ldb.Message()
163         m.dn = ldb.Dn(self.samdb, dn)
164         attr = 'repsFrom'
165
166         res = self.samdb.search(dn, scope=ldb.SCOPE_BASE,
167                                 attrs=['repsFrom'])
168         rep = ndr_unpack(drsblobs.repsFromToBlob, res[0]['repsFrom'][0],
169                          allow_remaining=True)
170         rep.ctr.result_last_attempt = -1
171         value = ndr_pack(rep)
172
173         m[attr] = ldb.MessageElement(value,
174                                      ldb.FLAG_MOD_REPLACE,
175                                      attr)
176         try:
177             self.samdb.modify(m)
178             self.fail("Failed to fail to modify %s %s" % (dn, attr))
179         except ldb.LdbError as e3:
180             (ecode, emsg) = e3.args
181             if ecode != ldb.ERR_REFERRAL:
182                 self.fail("Failed to REFER when trying to modify %s %s" %
183                           (dn, attr))
184             else:
185                 m = re.search(r'(ldap://[^>]+)>', emsg)
186                 if m is None:
187                     self.fail("referral seems not to refer to anything")
188                 address = m.group(1)
189
190                 if address.lower().startswith(self.samdb.domain_dns_name()):
191                     self.fail("referral address did not give a specific DC")
192
193     def test_delete_special_objects(self):
194         dn = 'CN=Guest,CN=Users,' + self.base_dn
195         try:
196             self.samdb.delete(dn)
197             self.fail("Failed to fail to delete %s" % (dn))
198         except ldb.LdbError as e4:
199             (ecode, emsg) = e4.args
200             if ecode != ldb.ERR_REFERRAL:
201                 print(ecode, emsg)
202                 self.fail("Failed to REFER when trying to delete %s" % dn)
203             else:
204                 m = re.search(r'(ldap://[^>]+)>', emsg)
205                 if m is None:
206                     self.fail("referral seems not to refer to anything")
207                 address = m.group(1)
208
209                 if address.lower().startswith(self.samdb.domain_dns_name()):
210                     self.fail("referral address did not give a specific DC")
211
212     def test_no_delete_nonexistent_objects(self):
213         dn = 'CN=does-not-exist-%s,CN=Users,%s' % (self.tag, self.base_dn)
214         try:
215             self.samdb.delete(dn)
216             self.fail("Failed to fail to delete %s" % (dn))
217         except ldb.LdbError as e5:
218             (ecode, emsg) = e5.args
219             if ecode != ldb.ERR_NO_SUCH_OBJECT:
220                 print(ecode, emsg)
221                 self.fail("Failed to NO_SUCH_OBJECT when trying to delete "
222                           "%s (which does not exist)" % dn)
223
224
225 def main():
226     global HOST, CREDS, LP
227     parser = optparse.OptionParser("rodc.py [options] <host>")
228
229     sambaopts = options.SambaOptions(parser)
230     versionopts = options.VersionOptions(parser)
231     credopts = options.CredentialsOptions(parser)
232     subunitopts = SubunitOptions(parser)
233
234     parser.add_option_group(sambaopts)
235     parser.add_option_group(versionopts)
236     parser.add_option_group(credopts)
237     parser.add_option_group(subunitopts)
238
239     opts, args = parser.parse_args()
240
241     LP = sambaopts.get_loadparm()
242     CREDS = credopts.get_credentials(LP)
243
244     try:
245         HOST = args[0]
246     except IndexError:
247         parser.print_usage()
248         sys.exit(1)
249
250     if "://" not in HOST:
251         if os.path.isfile(HOST):
252             HOST = "tdb://%s" % HOST
253         else:
254             HOST = "ldap://%s" % HOST
255
256     TestProgram(module=__name__, opts=subunitopts)
257
258 main()