selftest: remove unused import
[samba.git] / source4 / torture / drs / python / samba_tool_drs_showrepl.py
1 # Blackbox tests for "samba-tool drs" command
2 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 """Blackbox tests for samba-tool drs showrepl."""
20 import samba.tests
21 import drs_base
22 from samba.dcerpc import drsuapi
23 from samba import drs_utils
24 import os
25 import json
26 import ldb
27 import random
28 from samba.common import get_string
29
30 GUID_RE = r'[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}'
31 HEX8_RE = r'0x[\da-f]{8}'
32 DN_RE = r'(?:(?:CN|DC)=[\\:\w -]+,)+DC=com'
33
34
35 class SambaToolDrsShowReplTests(drs_base.DrsBaseTestCase):
36     """Blackbox test case for samba-tool drs."""
37
38     def setUp(self):
39         super(SambaToolDrsShowReplTests, self).setUp()
40
41         self.dc1 = samba.tests.env_get_var_value("DC1")
42         self.dc2 = samba.tests.env_get_var_value("DC2")
43
44         creds = self.get_credentials()
45         self.cmdline_creds = "-U%s/%s%%%s" % (creds.get_domain(),
46                                               creds.get_username(),
47                                               creds.get_password())
48
49     def test_samba_tool_showrepl(self):
50         """Tests 'samba-tool drs showrepl' command.
51         """
52         # Output should be like:
53         #      <site-name>/<domain-name>
54         #      DSA Options: <hex-options>
55         #      DSA object GUID: <DSA-object-GUID>
56         #      DSA invocationId: <DSA-invocationId>
57         #      <Inbound-connections-list>
58         #      <Outbound-connections-list>
59         #      <KCC-objects>
60         #      ...
61         #   TODO: Perhaps we should check at least for
62         #         DSA's objectGUDI and invocationId
63         out = self.check_output("samba-tool drs showrepl "
64                                 "%s %s" % (self.dc1, self.cmdline_creds))
65
66         out = get_string(out)
67         # We want to assert that we are getting the same results, but
68         # dates and GUIDs change randomly.
69         #
70         # There are sections with headers like ==== THIS ===="
71         (header,
72          _inbound, inbound,
73          _outbound, outbound,
74          _conn, conn) = out.split("====")
75
76         self.assertEqual(_inbound, ' INBOUND NEIGHBORS ')
77         self.assertEqual(_outbound, ' OUTBOUND NEIGHBORS ')
78         self.assertEqual(_conn, ' KCC CONNECTION OBJECTS ')
79
80         self.assertRegex(header,
81                          r'^Default-First-Site-Name\\LOCALDC\s+'
82                          r"DSA Options: %s\s+"
83                          r"DSA object GUID: %s\s+"
84                          r"DSA invocationId: %s" %
85                          (HEX8_RE, GUID_RE, GUID_RE))
86
87         # We don't assert the DomainDnsZones and ForestDnsZones are
88         # there because we don't know that they have been set up yet.
89
90         for p in ['CN=Configuration,DC=samba,DC=example,DC=com',
91                   'DC=samba,DC=example,DC=com',
92                   'CN=Schema,CN=Configuration,DC=samba,DC=example,DC=com']:
93             self.assertRegex(
94                 inbound,
95                 r'%s\n'
96                 r'\tDefault-First-Site-Name\\[A-Z]+ via RPC\n'
97                 r'\t\tDSA object GUID: %s\n'
98                 r'\t\tLast attempt @ [^\n]+\n'
99                 r'\t\t\d+ consecutive failure\(s\).\n'
100                 r'\t\tLast success @ [^\n]+\n'
101                 r'\n' % (p, GUID_RE),
102                 msg="%s inbound missing" % p)
103
104             self.assertRegex(
105                 outbound,
106                 r'%s\n'
107                 r'\tDefault-First-Site-Name\\[A-Z]+ via RPC\n'
108                 r'\t\tDSA object GUID: %s\n'
109                 r'\t\tLast attempt @ [^\n]+\n'
110                 r'\t\t\d+ consecutive failure\(s\).\n'
111                 r'\t\tLast success @ [^\n]+\n'
112                 r'\n' % (p, GUID_RE),
113                 msg="%s outbound missing" % p)
114
115         self.assertRegex(conn,
116                          r'Connection --\n'
117                          r'\tConnection name: %s\n'
118                          r'\tEnabled        : TRUE\n'
119                          r'\tServer DNS name : \w+.samba.example.com\n'
120                          r'\tServer DN name  : %s'
121                          r'\n' % (GUID_RE, DN_RE))
122
123     def test_samba_tool_showrepl_json(self):
124         """Tests 'samba-tool drs showrepl --json' command.
125         """
126         out = self.check_output("samba-tool drs showrepl %s %s --json" %
127                                 (self.dc1, self.cmdline_creds))
128         d = json.loads(get_string(out))
129         self.assertEqual(set(d), set(['repsFrom',
130                                       'repsTo',
131                                       "NTDSConnections",
132                                       "dsa"]))
133
134         # dsa
135         for k in ["objectGUID", "invocationId"]:
136             self.assertRegex(d['dsa'][k], '^%s$' % GUID_RE)
137         self.assertTrue(isinstance(d['dsa']["options"], int))
138
139         # repsfrom and repsto
140         for reps in (d['repsFrom'], d['repsTo']):
141             for r in reps:
142                 for k in ('NC dn', "NTDS DN"):
143                     self.assertRegex(r[k], '^%s$' % DN_RE)
144                 for k in ("last attempt time",
145                           "last attempt message",
146                           "last success"):
147                     self.assertTrue(isinstance(r[k], str))
148                 self.assertRegex(r["DSA objectGUID"], '^%s$' % GUID_RE)
149                 self.assertTrue(isinstance(r["consecutive failures"], int))
150
151         # ntdsconnection
152         for n in d["NTDSConnections"]:
153             self.assertRegex(n["dns name"],
154                              r'^[\w]+\.samba\.example\.com$')
155             self.assertRegex(n["name"], "^%s$" % GUID_RE)
156             self.assertTrue(isinstance(n['enabled'], bool))
157             self.assertTrue(isinstance(n['options'], int))
158             self.assertTrue(isinstance(n['replicates NC'], list))
159             self.assertRegex(n["remote DN"], "^%s$" % DN_RE)
160
161     def _force_all_reps(self, samdb, dc, direction):
162         if direction == 'inbound':
163             info_type = drsuapi.DRSUAPI_DS_REPLICA_INFO_NEIGHBORS
164         elif direction == 'outbound':
165             info_type = drsuapi.DRSUAPI_DS_REPLICA_INFO_REPSTO
166         else:
167             raise ValueError("expected 'inbound' or 'outbound'")
168
169         self._enable_all_repl(dc)
170         lp = self.get_loadparm()
171         creds = self.get_credentials()
172         drsuapi_conn, drsuapi_handle, _ = drs_utils.drsuapi_connect(dc, lp, creds)
173         req1 = drsuapi.DsReplicaGetInfoRequest1()
174         req1.info_type = info_type
175         _, info = drsuapi_conn.DsReplicaGetInfo(drsuapi_handle, 1, req1)
176         for x in info.array:
177             # you might think x.source_dsa_address was the thing, but no.
178             # and we need to filter out RODCs and deleted DCs
179
180             res = []
181             try:
182                 res = samdb.search(base=x.source_dsa_obj_dn,
183                                    scope=ldb.SCOPE_BASE,
184                                    attrs=['msDS-isRODC', 'isDeleted'],
185                                    controls=['show_deleted:0'])
186             except ldb.LdbError as e:
187                 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
188                     raise
189
190             if (len(res) == 0 or
191                 len(res[0].get('msDS-isRODC', '')) > 0 or
192                 res[0]['isDeleted'] == 'TRUE'):
193                 continue
194
195             dsa_dn = str(ldb.Dn(samdb, x.source_dsa_obj_dn).parent())
196             try:
197                 res = samdb.search(base=dsa_dn,
198                                    scope=ldb.SCOPE_BASE,
199                                    attrs=['dNSHostName'])
200             except ldb.LdbError as e:
201                 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
202                     raise
203                 continue
204
205             if len(res) == 0:
206                 print("server %s has no dNSHostName" % dsa_dn)
207                 continue
208
209             remote = res[0].get('dNSHostName', [''])[0]
210             if remote:
211                 self._enable_all_repl(remote)
212
213             if direction == 'inbound':
214                 src, dest = remote, dc
215             else:
216                 src, dest = dc, remote
217             self._net_drs_replicate(dest, src, forced=True)
218
219     def test_samba_tool_showrepl_pull_summary_all_good(self):
220         """Tests 'samba-tool drs showrepl --pull-summary' command."""
221         # To be sure that all is good we need to force replication
222         # with everyone (because others might have it turned off), and
223         # turn replication on for them in case they suddenly decide to
224         # try again.
225         #
226         # We don't restore them to the non-auto-replication state.
227         samdb1 = self.getSamDB("-H", "ldap://%s" % self.dc1, "-U",
228                                self.cmdline_creds)
229         self._enable_all_repl(self.dc1)
230         self._force_all_reps(samdb1, self.dc1, 'inbound')
231         self._force_all_reps(samdb1, self.dc1, 'outbound')
232         old_no_color = os.environ.get('NO_COLOR')
233         all_good_green = "\033[1;32m[ALL GOOD]\033[0m\n"
234         all_good = "[ALL GOOD]\n"
235
236         try:
237             out = self.check_output(
238                 "samba-tool drs showrepl --pull-summary %s %s" %
239                 (self.dc1, self.cmdline_creds))
240             out = get_string(out)
241             self.assertStringsEqual(out, all_good)
242             out = get_string(out)
243
244             out = self.check_output("samba-tool drs showrepl --pull-summary "
245                                     "--color=yes %s %s" %
246                                     (self.dc1, self.cmdline_creds))
247             out = get_string(out)
248             self.assertStringsEqual(out, all_good_green)
249
250             # --verbose output is still quiet when all is good.
251             out = self.check_output(
252                 "samba-tool drs showrepl --pull-summary -v %s %s" %
253                 (self.dc1, self.cmdline_creds))
254             out = get_string(out)
255             self.assertStringsEqual(out, all_good)
256
257             out = self.check_output("samba-tool drs showrepl --pull-summary -v "
258                                     "--color=always %s %s" %
259                                     (self.dc1, self.cmdline_creds))
260             out = get_string(out)
261             self.assertStringsEqual(out, all_good_green)
262
263             out = self.check_output("samba-tool drs showrepl --pull-summary -v "
264                                     "--color=never %s %s" %
265                                     (self.dc1, self.cmdline_creds))
266             out = get_string(out)
267             self.assertStringsEqual(out, all_good)
268
269             os.environ['NO_COLOR'] = 'bean'
270
271             out = self.check_output("samba-tool drs showrepl --pull-summary -v "
272                                     "--color=auto %s %s" %
273                                     (self.dc1, self.cmdline_creds))
274             out = get_string(out)
275             self.assertStringsEqual(out, all_good)
276
277             os.environ['NO_COLOR'] = ''
278
279             out = self.check_output("samba-tool drs showrepl --pull-summary -v "
280                                     "--color=auto %s %s" %
281                                     (self.dc1, self.cmdline_creds))
282             out = get_string(out)
283             self.assertStringsEqual(out, all_good_green)
284
285         except samba.tests.BlackboxProcessError as e:
286             self.fail(str(e))
287         finally:
288             if old_no_color is None:
289                 os.environ.pop('NO_COLOR', None)
290             else:
291                 os.environ['NO_COLOR'] = old_no_color
292
293     def test_samba_tool_showrepl_summary_forced_failure(self):
294         """Tests 'samba-tool drs showrepl --summary' command when we break the
295         network on purpose.
296         """
297         self.addCleanup(self._enable_all_repl, self.dc1)
298         self._disable_all_repl(self.dc1)
299
300         samdb1 = self.getSamDB("-H", "ldap://%s" % self.dc1, "-U",
301                                self.cmdline_creds)
302         samdb2 = self.getSamDB("-H", "ldap://%s" % self.dc2, "-U",
303                                self.cmdline_creds)
304         domain_dn = samdb1.domain_dn()
305
306         # Add some things to NOT replicate
307         ou1 = "OU=dc1.%x,%s" % (random.randrange(1 << 64), domain_dn)
308         ou2 = "OU=dc2.%x,%s" % (random.randrange(1 << 64), domain_dn)
309         samdb1.add({
310             "dn": ou1,
311             "objectclass": "organizationalUnit"
312         })
313         self.addCleanup(samdb1.delete, ou1, ['tree_delete:1'])
314         samdb2.add({
315             "dn": ou2,
316             "objectclass": "organizationalUnit"
317         })
318         self.addCleanup(samdb2.delete, ou2, ['tree_delete:1'])
319
320         dn1 = 'cn=u1.%%d,%s' % (ou1)
321         dn2 = 'cn=u2.%%d,%s' % (ou2)
322
323         try:
324             for i in range(100):
325                 samdb1.add({
326                     "dn": dn1 % i,
327                     "objectclass": "user"
328                 })
329                 samdb2.add({
330                     "dn": dn2 % i,
331                     "objectclass": "user"
332                 })
333                 out = self.check_output("samba-tool drs showrepl --summary -v "
334                                         "%s %s" %
335                                         (self.dc1, self.cmdline_creds))
336                 out = get_string(out)
337                 self.assertStringsEqual('[ALL GOOD]', out, strip=True)
338                 out = self.check_output("samba-tool drs showrepl --summary -v "
339                                         "--color=yes %s %s" %
340                                         (self.dc2, self.cmdline_creds))
341                 out = get_string(out)
342                 self.assertIn('[ALL GOOD]', out)
343
344         except samba.tests.BlackboxProcessError as e:
345             e_stdout = get_string(e.stdout)
346             e_stderr = get_string(e.stderr)
347             print("Good, failed as expected after %d rounds: %r" % (i, e.cmd))
348             self.assertIn('There are failing connections', e_stdout,
349                           msg=('stdout: %r\nstderr: %r\nretcode: %s'
350                                '\nmessage: %r\ncmd: %r') % (e_stdout,
351                                                             e_stderr,
352                                                             e.returncode,
353                                                             e.msg,
354                                                             e.cmd))
355             self.assertRegex(
356                 e_stdout,
357                 r'result 845[67] '
358                 r'\(WERR_DS_DRA_(SINK|SOURCE)_DISABLED\)',
359                 msg=("The process should have failed "
360                      "because replication was forced off, "
361                      "but it failed for some other reason."))
362             self.assertIn('consecutive failure(s).', e_stdout)
363         else:
364             self.fail("No DRS failure noticed after 100 rounds of trying")