2fe8ab5b638ceb3c886f49faa40ba7e5f7f56205
[samba.git] / source4 / torture / drs / python / samba_tool_drs_showrepl.py
1 # Blackbox tests for "samba-tool drs" command
2 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 """Blackbox tests for samba-tool drs showrepl."""
20 from __future__ import print_function
21 import samba.tests
22 import drs_base
23 from samba.dcerpc import drsuapi
24 from samba import drs_utils
25 import re
26 import json
27 import ldb
28 import random
29 from samba.compat import PY3
30
31 if PY3:
32     json_str = str
33 else:
34     json_str = unicode
35
36 GUID_RE = r'[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}'
37 HEX8_RE = r'0x[\da-f]{8}'
38 DN_RE = r'(?:(?:CN|DC)=[\\:\w -]+,)+DC=com'
39
40
41 class SambaToolDrsShowReplTests(drs_base.DrsBaseTestCase):
42     """Blackbox test case for samba-tool drs."""
43
44     def setUp(self):
45         super(SambaToolDrsShowReplTests, self).setUp()
46
47         self.dc1 = samba.tests.env_get_var_value("DC1")
48         self.dc2 = samba.tests.env_get_var_value("DC2")
49
50         creds = self.get_credentials()
51         self.cmdline_creds = "-U%s/%s%%%s" % (creds.get_domain(),
52                                               creds.get_username(),
53                                               creds.get_password())
54
55     def test_samba_tool_showrepl(self):
56         """Tests 'samba-tool drs showrepl' command.
57         """
58         # Output should be like:
59         #      <site-name>/<domain-name>
60         #      DSA Options: <hex-options>
61         #      DSA object GUID: <DSA-object-GUID>
62         #      DSA invocationId: <DSA-invocationId>
63         #      <Inbound-connections-list>
64         #      <Outbound-connections-list>
65         #      <KCC-objects>
66         #      ...
67         #   TODO: Perhaps we should check at least for
68         #         DSA's objectGUDI and invocationId
69         out = self.check_output("samba-tool drs showrepl "
70                                 "%s %s" % (self.dc1, self.cmdline_creds))
71
72         # We want to assert that we are getting the same results, but
73         # dates and GUIDs change randomly.
74         #
75         # There are sections with headers like ==== THIS ===="
76         (header,
77          _inbound, inbound,
78          _outbound, outbound,
79          _conn, conn) = out.split("====")
80
81         self.assertEqual(_inbound, ' INBOUND NEIGHBORS ')
82         self.assertEqual(_outbound, ' OUTBOUND NEIGHBORS ')
83         self.assertEqual(_conn, ' KCC CONNECTION OBJECTS ')
84
85         self.assertRegexpMatches(header,
86                                  r'^Default-First-Site-Name\\LOCALDC\s+'
87                                  r"DSA Options: %s\s+"
88                                  r"DSA object GUID: %s\s+"
89                                  r"DSA invocationId: %s" %
90                                  (HEX8_RE, GUID_RE, GUID_RE))
91
92         # We don't assert the DomainDnsZones and ForestDnsZones are
93         # there because we don't know that they have been set up yet.
94
95         for p in ['CN=Configuration,DC=samba,DC=example,DC=com',
96                   'DC=samba,DC=example,DC=com',
97                   'CN=Schema,CN=Configuration,DC=samba,DC=example,DC=com']:
98             self.assertRegexpMatches(
99                 inbound,
100                 r'%s\n'
101                 r'\tDefault-First-Site-Name\\[A-Z]+ via RPC\n'
102                 r'\t\tDSA object GUID: %s\n'
103                 r'\t\tLast attempt @ [^\n]+\n'
104                 r'\t\t\d+ consecutive failure\(s\).\n'
105                 r'\t\tLast success @ [^\n]+\n'
106                 r'\n' % (p, GUID_RE),
107                 msg="%s inbound missing" % p)
108
109             self.assertRegexpMatches(
110                 outbound,
111                 r'%s\n'
112                 r'\tDefault-First-Site-Name\\[A-Z]+ via RPC\n'
113                 r'\t\tDSA object GUID: %s\n'
114                 r'\t\tLast attempt @ [^\n]+\n'
115                 r'\t\t\d+ consecutive failure\(s\).\n'
116                 r'\t\tLast success @ [^\n]+\n'
117                 r'\n' % (p, GUID_RE),
118                 msg="%s outbound missing" % p)
119
120         self.assertRegexpMatches(conn,
121                                  r'Connection --\n'
122                                  r'\tConnection name: %s\n'
123                                  r'\tEnabled        : TRUE\n'
124                                  r'\tServer DNS name : \w+.samba.example.com\n'
125                                  r'\tServer DN name  : %s'
126                                  r'\n' % (GUID_RE, DN_RE))
127
128     def test_samba_tool_showrepl_json(self):
129         """Tests 'samba-tool drs showrepl --json' command.
130         """
131         out = self.check_output("samba-tool drs showrepl %s %s --json" %
132                                 (self.dc1, self.cmdline_creds))
133
134         d = json.loads(out)
135         self.assertEqual(set(d), set(['repsFrom',
136                                       'repsTo',
137                                       "NTDSConnections",
138                                       "dsa"]))
139
140         # dsa
141         for k in ["objectGUID", "invocationId"]:
142             self.assertRegexpMatches(d['dsa'][k], '^%s$' % GUID_RE)
143         self.assertTrue(isinstance(d['dsa']["options"], int))
144
145         # repsfrom and repsto
146         for reps in (d['repsFrom'], d['repsTo']):
147             for r in reps:
148                 for k in ('NC dn', "NTDS DN"):
149                     self.assertRegexpMatches(r[k], '^%s$' % DN_RE)
150                 for k in ("last attempt time",
151                           "last attempt message",
152                           "last success"):
153                     self.assertTrue(isinstance(r[k], json_str))
154                 self.assertRegexpMatches(r["DSA objectGUID"], '^%s$' % GUID_RE)
155                 self.assertTrue(isinstance(r["consecutive failures"], int))
156
157         # ntdsconnection
158         for n in d["NTDSConnections"]:
159             self.assertRegexpMatches(n["dns name"],
160                                      r'^[\w]+\.samba\.example\.com$')
161             self.assertRegexpMatches(n["name"], "^%s$" % GUID_RE)
162             self.assertTrue(isinstance(n['enabled'], bool))
163             self.assertTrue(isinstance(n['options'], int))
164             self.assertTrue(isinstance(n['replicates NC'], list))
165             self.assertRegexpMatches(n["remote DN"], "^%s$" % DN_RE)
166
167     def _force_all_reps(self, samdb, dc, direction):
168         if direction == 'inbound':
169             info_type = drsuapi.DRSUAPI_DS_REPLICA_INFO_NEIGHBORS
170         elif direction == 'outbound':
171             info_type = drsuapi.DRSUAPI_DS_REPLICA_INFO_REPSTO
172         else:
173             raise ValueError("expected 'inbound' or 'outbound'")
174
175         self._enable_all_repl(dc)
176         lp = self.get_loadparm()
177         creds = self.get_credentials()
178         drsuapi_conn, drsuapi_handle, _ = drs_utils.drsuapi_connect(dc, lp, creds)
179         req1 = drsuapi.DsReplicaGetInfoRequest1()
180         req1.info_type = info_type
181         _, info = drsuapi_conn.DsReplicaGetInfo(drsuapi_handle, 1, req1)
182         for x in info.array:
183             # you might think x.source_dsa_address was the thing, but no.
184             # and we need to filter out RODCs and deleted DCs
185
186             res = []
187             try:
188                 res = samdb.search(base=x.source_dsa_obj_dn,
189                                    scope=ldb.SCOPE_BASE,
190                                    attrs=['msDS-isRODC', 'isDeleted'],
191                                    controls=['show_deleted:0'])
192             except ldb.LdbError as e:
193                 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
194                     raise
195
196             if (len(res) == 0 or
197                 len(res[0].get('msDS-isRODC', '')) > 0 or
198                 res[0]['isDeleted'] == 'TRUE'):
199                 continue
200
201             dsa_dn = str(ldb.Dn(samdb, x.source_dsa_obj_dn).parent())
202             try:
203                 res = samdb.search(base=dsa_dn,
204                                    scope=ldb.SCOPE_BASE,
205                                    attrs=['dNSHostName'])
206             except ldb.LdbError as e:
207                 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
208                     raise
209                 continue
210
211             if len(res) == 0:
212                 print("server %s has no dNSHostName" % dsa_dn)
213                 continue
214
215             remote = res[0].get('dNSHostName', [''])[0]
216             if remote:
217                 self._enable_all_repl(remote)
218
219             if direction == 'inbound':
220                 src, dest = remote, dc
221             else:
222                 src, dest = dc, remote
223             self._net_drs_replicate(dest, src, forced=True)
224
225     def test_samba_tool_showrepl_summary_all_good(self):
226         """Tests 'samba-tool drs showrepl --summary' command."""
227         # To be sure that all is good we need to force replication
228         # with everyone (because others might have it turned off), and
229         # turn replication on for them in case they suddenly decide to
230         # try again.
231         #
232         # We don't restore them to the non-auto-replication state.
233         samdb1 = self.getSamDB("-H", "ldap://%s" % self.dc1, "-U",
234                                self.cmdline_creds)
235         self._enable_all_repl(self.dc1)
236         self._force_all_reps(samdb1, self.dc1, 'inbound')
237         self._force_all_reps(samdb1, self.dc1, 'outbound')
238
239         out = self.check_output("samba-tool drs showrepl --summary %s %s" %
240                                 (self.dc1, self.cmdline_creds))
241         self.assertStringsEqual(out, "[ALL GOOD]\n")
242
243         out = self.check_output("samba-tool drs showrepl --summary "
244                                 "--color=yes %s %s" %
245                                 (self.dc1, self.cmdline_creds))
246         self.assertStringsEqual(out, "\033[1;32m[ALL GOOD]\033[0m\n")
247
248         # --verbose output is still quiet when all is good.
249         out = self.check_output("samba-tool drs showrepl --summary -v %s %s" %
250                                 (self.dc1, self.cmdline_creds))
251         self.assertStringsEqual(out, "[ALL GOOD]\n")
252         out = self.check_output("samba-tool drs showrepl --summary -v "
253                                 "--color=yes %s %s" %
254                                 (self.dc1, self.cmdline_creds))
255         self.assertStringsEqual(out, "\033[1;32m[ALL GOOD]\033[0m\n")
256
257     def test_samba_tool_showrepl_summary_forced_failure(self):
258         """Tests 'samba-tool drs showrepl --summary' command when we break the
259         network on purpose.
260         """
261         self.addCleanup(self._enable_all_repl, self.dc1)
262         self._disable_all_repl(self.dc1)
263
264         samdb1 = self.getSamDB("-H", "ldap://%s" % self.dc1, "-U",
265                                self.cmdline_creds)
266         samdb2 = self.getSamDB("-H", "ldap://%s" % self.dc2, "-U",
267                                self.cmdline_creds)
268         domain_dn = samdb1.domain_dn()
269
270         # Add some things to NOT replicate
271         ou1 = "OU=dc1.%x,%s" % (random.randrange(1 << 64), domain_dn)
272         ou2 = "OU=dc2.%x,%s" % (random.randrange(1 << 64), domain_dn)
273         samdb1.add({
274             "dn": ou1,
275             "objectclass": "organizationalUnit"
276         })
277         self.addCleanup(samdb1.delete, ou1, ['tree_delete:1'])
278         samdb2.add({
279             "dn": ou2,
280             "objectclass": "organizationalUnit"
281         })
282         self.addCleanup(samdb2.delete, ou2, ['tree_delete:1'])
283
284         dn1 = 'cn=u1.%%d,%s' % (ou1)
285         dn2 = 'cn=u2.%%d,%s' % (ou2)
286
287         try:
288             for i in range(100):
289                 samdb1.add({
290                     "dn": dn1 % i,
291                     "objectclass": "user"
292                 })
293                 samdb2.add({
294                     "dn": dn2 % i,
295                     "objectclass": "user"
296                 })
297                 out = self.check_output("samba-tool drs showrepl --summary -v "
298                                         "%s %s" %
299                                         (self.dc1, self.cmdline_creds))
300                 self.assertStringsEqual('[ALL GOOD]', out, strip=True)
301                 out = self.check_output("samba-tool drs showrepl --summary -v "
302                                         "--color=yes %s %s" %
303                                         (self.dc2, self.cmdline_creds))
304                 self.assertIn('[ALL GOOD]', out)
305
306         except samba.tests.BlackboxProcessError as e:
307             print("Good, failed as expected after %d rounds: %r" % (i, e.cmd))
308             self.assertIn('There are failing connections', e.stdout)
309             self.assertRegexpMatches(
310                 e.stdout,
311                 r'result 845[67] '
312                 r'\(WERR_DS_DRA_(SINK|SOURCE)_DISABLED\)',
313                 msg=("The process should have failed "
314                      "because replication was forced off, "
315                      "but it failed for some other reason."))
316             self.assertIn('consecutive failure(s).', e.stdout)
317         else:
318             self.fail("No DRS failure noticed after 100 rounds of trying")