1 # Blackbox tests for "samba-tool drs" command
2 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Blackbox tests for samba-tool drs showrepl."""
22 from samba.dcerpc import drsuapi
23 from samba import drs_utils
28 from samba.common import get_string
30 GUID_RE = r'[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}'
31 HEX8_RE = r'0x[\da-f]{8}'
32 DN_RE = r'(?:(?:CN|DC)=[\\:\w -]+,)+DC=com'
35 class SambaToolDrsShowReplTests(drs_base.DrsBaseTestCase):
36 """Blackbox test case for samba-tool drs."""
39 super(SambaToolDrsShowReplTests, self).setUp()
41 self.dc1 = samba.tests.env_get_var_value("DC1")
42 self.dc2 = samba.tests.env_get_var_value("DC2")
44 creds = self.get_credentials()
45 self.cmdline_creds = "-U%s/%s%%%s" % (creds.get_domain(),
49 def test_samba_tool_showrepl(self):
50 """Tests 'samba-tool drs showrepl' command.
52 # Output should be like:
53 # <site-name>/<domain-name>
54 # DSA Options: <hex-options>
55 # DSA object GUID: <DSA-object-GUID>
56 # DSA invocationId: <DSA-invocationId>
57 # <Inbound-connections-list>
58 # <Outbound-connections-list>
61 # TODO: Perhaps we should check at least for
62 # DSA's objectGUDI and invocationId
63 out = self.check_output("samba-tool drs showrepl "
64 "%s %s" % (self.dc1, self.cmdline_creds))
67 # We want to assert that we are getting the same results, but
68 # dates and GUIDs change randomly.
70 # There are sections with headers like ==== THIS ===="
74 _conn, conn) = out.split("====")
76 self.assertEqual(_inbound, ' INBOUND NEIGHBORS ')
77 self.assertEqual(_outbound, ' OUTBOUND NEIGHBORS ')
78 self.assertEqual(_conn, ' KCC CONNECTION OBJECTS ')
80 self.assertRegex(header,
81 r'^Default-First-Site-Name\\LOCALDC\s+'
83 r"DSA object GUID: %s\s+"
84 r"DSA invocationId: %s" %
85 (HEX8_RE, GUID_RE, GUID_RE))
87 # We don't assert the DomainDnsZones and ForestDnsZones are
88 # there because we don't know that they have been set up yet.
90 for p in ['CN=Configuration,DC=samba,DC=example,DC=com',
91 'DC=samba,DC=example,DC=com',
92 'CN=Schema,CN=Configuration,DC=samba,DC=example,DC=com']:
96 r'\tDefault-First-Site-Name\\[A-Z]+ via RPC\n'
97 r'\t\tDSA object GUID: %s\n'
98 r'\t\tLast attempt @ [^\n]+\n'
99 r'\t\t\d+ consecutive failure\(s\).\n'
100 r'\t\tLast success @ [^\n]+\n'
101 r'\n' % (p, GUID_RE),
102 msg="%s inbound missing" % p)
107 r'\tDefault-First-Site-Name\\[A-Z]+ via RPC\n'
108 r'\t\tDSA object GUID: %s\n'
109 r'\t\tLast attempt @ [^\n]+\n'
110 r'\t\t\d+ consecutive failure\(s\).\n'
111 r'\t\tLast success @ [^\n]+\n'
112 r'\n' % (p, GUID_RE),
113 msg="%s outbound missing" % p)
115 self.assertRegex(conn,
117 r'\tConnection name: %s\n'
118 r'\tEnabled : TRUE\n'
119 r'\tServer DNS name : \w+.samba.example.com\n'
120 r'\tServer DN name : %s'
121 r'\n' % (GUID_RE, DN_RE))
123 def test_samba_tool_showrepl_json(self):
124 """Tests 'samba-tool drs showrepl --json' command.
126 out = self.check_output("samba-tool drs showrepl %s %s --json" %
127 (self.dc1, self.cmdline_creds))
128 d = json.loads(get_string(out))
129 self.assertEqual(set(d), set(['repsFrom',
135 for k in ["objectGUID", "invocationId"]:
136 self.assertRegex(d['dsa'][k], '^%s$' % GUID_RE)
137 self.assertTrue(isinstance(d['dsa']["options"], int))
139 # repsfrom and repsto
140 for reps in (d['repsFrom'], d['repsTo']):
142 for k in ('NC dn', "NTDS DN"):
143 self.assertRegex(r[k], '^%s$' % DN_RE)
144 for k in ("last attempt time",
145 "last attempt message",
147 self.assertTrue(isinstance(r[k], str))
148 self.assertRegex(r["DSA objectGUID"], '^%s$' % GUID_RE)
149 self.assertTrue(isinstance(r["consecutive failures"], int))
152 for n in d["NTDSConnections"]:
153 self.assertRegex(n["dns name"],
154 r'^[\w]+\.samba\.example\.com$')
155 self.assertRegex(n["name"], "^%s$" % GUID_RE)
156 self.assertTrue(isinstance(n['enabled'], bool))
157 self.assertTrue(isinstance(n['options'], int))
158 self.assertTrue(isinstance(n['replicates NC'], list))
159 self.assertRegex(n["remote DN"], "^%s$" % DN_RE)
161 def _force_all_reps(self, samdb, dc, direction):
162 if direction == 'inbound':
163 info_type = drsuapi.DRSUAPI_DS_REPLICA_INFO_NEIGHBORS
164 elif direction == 'outbound':
165 info_type = drsuapi.DRSUAPI_DS_REPLICA_INFO_REPSTO
167 raise ValueError("expected 'inbound' or 'outbound'")
169 self._enable_all_repl(dc)
170 lp = self.get_loadparm()
171 creds = self.get_credentials()
172 drsuapi_conn, drsuapi_handle, _ = drs_utils.drsuapi_connect(dc, lp, creds)
173 req1 = drsuapi.DsReplicaGetInfoRequest1()
174 req1.info_type = info_type
175 _, info = drsuapi_conn.DsReplicaGetInfo(drsuapi_handle, 1, req1)
177 # you might think x.source_dsa_address was the thing, but no.
178 # and we need to filter out RODCs and deleted DCs
182 res = samdb.search(base=x.source_dsa_obj_dn,
183 scope=ldb.SCOPE_BASE,
184 attrs=['msDS-isRODC', 'isDeleted'],
185 controls=['show_deleted:0'])
186 except ldb.LdbError as e:
187 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
191 len(res[0].get('msDS-isRODC', '')) > 0 or
192 res[0]['isDeleted'] == 'TRUE'):
195 dsa_dn = str(ldb.Dn(samdb, x.source_dsa_obj_dn).parent())
197 res = samdb.search(base=dsa_dn,
198 scope=ldb.SCOPE_BASE,
199 attrs=['dNSHostName'])
200 except ldb.LdbError as e:
201 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
206 print("server %s has no dNSHostName" % dsa_dn)
209 remote = res[0].get('dNSHostName', [''])[0]
211 self._enable_all_repl(remote)
213 if direction == 'inbound':
214 src, dest = remote, dc
216 src, dest = dc, remote
217 self._net_drs_replicate(dest, src, forced=True)
219 def test_samba_tool_showrepl_pull_summary_all_good(self):
220 """Tests 'samba-tool drs showrepl --pull-summary' command."""
221 # To be sure that all is good we need to force replication
222 # with everyone (because others might have it turned off), and
223 # turn replication on for them in case they suddenly decide to
226 # We don't restore them to the non-auto-replication state.
227 samdb1 = self.getSamDB("-H", "ldap://%s" % self.dc1, "-U",
229 self._enable_all_repl(self.dc1)
230 self._force_all_reps(samdb1, self.dc1, 'inbound')
231 self._force_all_reps(samdb1, self.dc1, 'outbound')
232 old_no_color = os.environ.get('NO_COLOR')
233 all_good_green = "\033[1;32m[ALL GOOD]\033[0m\n"
234 all_good = "[ALL GOOD]\n"
237 out = self.check_output(
238 "samba-tool drs showrepl --pull-summary %s %s" %
239 (self.dc1, self.cmdline_creds))
240 out = get_string(out)
241 self.assertStringsEqual(out, all_good)
242 out = get_string(out)
244 out = self.check_output("samba-tool drs showrepl --pull-summary "
245 "--color=yes %s %s" %
246 (self.dc1, self.cmdline_creds))
247 out = get_string(out)
248 self.assertStringsEqual(out, all_good_green)
250 # --verbose output is still quiet when all is good.
251 out = self.check_output(
252 "samba-tool drs showrepl --pull-summary -v %s %s" %
253 (self.dc1, self.cmdline_creds))
254 out = get_string(out)
255 self.assertStringsEqual(out, all_good)
257 out = self.check_output("samba-tool drs showrepl --pull-summary -v "
258 "--color=always %s %s" %
259 (self.dc1, self.cmdline_creds))
260 out = get_string(out)
261 self.assertStringsEqual(out, all_good_green)
263 out = self.check_output("samba-tool drs showrepl --pull-summary -v "
264 "--color=never %s %s" %
265 (self.dc1, self.cmdline_creds))
266 out = get_string(out)
267 self.assertStringsEqual(out, all_good)
269 os.environ['NO_COLOR'] = 'bean'
271 out = self.check_output("samba-tool drs showrepl --pull-summary -v "
272 "--color=auto %s %s" %
273 (self.dc1, self.cmdline_creds))
274 out = get_string(out)
275 self.assertStringsEqual(out, all_good)
277 os.environ['NO_COLOR'] = ''
279 out = self.check_output("samba-tool drs showrepl --pull-summary -v "
280 "--color=auto %s %s" %
281 (self.dc1, self.cmdline_creds))
282 out = get_string(out)
283 self.assertStringsEqual(out, all_good_green)
285 except samba.tests.BlackboxProcessError as e:
288 if old_no_color is None:
289 os.environ.pop('NO_COLOR', None)
291 os.environ['NO_COLOR'] = old_no_color
293 def test_samba_tool_showrepl_summary_forced_failure(self):
294 """Tests 'samba-tool drs showrepl --summary' command when we break the
297 self.addCleanup(self._enable_all_repl, self.dc1)
298 self._disable_all_repl(self.dc1)
300 samdb1 = self.getSamDB("-H", "ldap://%s" % self.dc1, "-U",
302 samdb2 = self.getSamDB("-H", "ldap://%s" % self.dc2, "-U",
304 domain_dn = samdb1.domain_dn()
306 # Add some things to NOT replicate
307 ou1 = "OU=dc1.%x,%s" % (random.randrange(1 << 64), domain_dn)
308 ou2 = "OU=dc2.%x,%s" % (random.randrange(1 << 64), domain_dn)
311 "objectclass": "organizationalUnit"
313 self.addCleanup(samdb1.delete, ou1, ['tree_delete:1'])
316 "objectclass": "organizationalUnit"
318 self.addCleanup(samdb2.delete, ou2, ['tree_delete:1'])
320 dn1 = 'cn=u1.%%d,%s' % (ou1)
321 dn2 = 'cn=u2.%%d,%s' % (ou2)
327 "objectclass": "user"
331 "objectclass": "user"
333 out = self.check_output("samba-tool drs showrepl --summary -v "
335 (self.dc1, self.cmdline_creds))
336 out = get_string(out)
337 self.assertStringsEqual('[ALL GOOD]', out, strip=True)
338 out = self.check_output("samba-tool drs showrepl --summary -v "
339 "--color=yes %s %s" %
340 (self.dc2, self.cmdline_creds))
341 out = get_string(out)
342 self.assertIn('[ALL GOOD]', out)
344 except samba.tests.BlackboxProcessError as e:
345 e_stdout = get_string(e.stdout)
346 e_stderr = get_string(e.stderr)
347 print("Good, failed as expected after %d rounds: %r" % (i, e.cmd))
348 self.assertIn('There are failing connections', e_stdout,
349 msg=('stdout: %r\nstderr: %r\nretcode: %s'
350 '\nmessage: %r\ncmd: %r') % (e_stdout,
358 r'\(WERR_DS_DRA_(SINK|SOURCE)_DISABLED\)',
359 msg=("The process should have failed "
360 "because replication was forced off, "
361 "but it failed for some other reason."))
362 self.assertIn('consecutive failure(s).', e_stdout)
364 self.fail("No DRS failure noticed after 100 rounds of trying")