1 # -*- coding: utf-8 -*-
2 # Originally based on tests for samba.kcc.ldif_import_export.
3 # Copyright (C) Andrew Bartlett 2015, 2018
5 # by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Tests for samba-tool visualize using the vampire DC and promoted DC
21 environments. For most tests we assume we can't assert much about what
22 state they are in, so we mainly check for command failure, but for
23 others we try to grasp control of replication and make more specific
27 from __future__ import print_function
33 from samba.tests.samba_tool.base import SambaToolCmdTest
38 'promoted_dc': ['CN=PROMOTEDVDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com',
39 'CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
40 'vampire_dc': ['CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com',
41 'CN=LOCALVAMPIREDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
53 def set_auto_replication(dc, allow):
54 credstring = '-U%s%%%s' % (os.environ["USERNAME"], os.environ["PASSWORD"])
55 on_or_off = '-' if allow else '+'
57 for opt in ['DISABLE_INBOUND_REPL',
58 'DISABLE_OUTBOUND_REPL']:
59 cmd = ['bin/samba-tool',
62 "--dsa-option=%s%s" % (on_or_off, opt)]
64 subprocess.check_call(cmd)
67 def force_replication(src, dest, base):
68 credstring = '-U%s%%%s' % (os.environ["USERNAME"], os.environ["PASSWORD"])
69 cmd = ['bin/samba-tool',
75 subprocess.check_call(cmd)
78 def get_utf8_matrix(s):
79 # parse the graphical table *just* well enough for our tests
81 s = re.sub("\033" r"\[[^m]+m", '', s)
83 # matrix rows have '·' on the diagonal
84 rows = [x.strip().replace('·', '0') for x in lines if '·' in x]
88 parts = r.rsplit(None, len(rows))
89 k, v = parts[0], parts[1:]
90 # we want the FOO in 'CN=FOO+' or 'CN=FOO,CN=x,DC=...'
91 k = re.match(r'cn=([^+,]+)', k.lower()).group(1)
93 if len(v) == 1: # this is a single-digit matrix, no spaces
95 values.append([int(x) if x.isdigit() else 1e999 for x in v])
98 for n1, row in zip(names, values):
100 for n2, v in zip(names, row):
106 class SambaToolVisualizeDrsTest(SambaToolCmdTest):
108 super(SambaToolVisualizeDrsTest, self).setUp()
110 def test_ntdsconn(self):
111 server = "ldap://%s" % os.environ["SERVER"]
112 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
113 (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
117 self.assertCmdSuccess(result, out, err)
119 def test_ntdsconn_remote(self):
120 server = "ldap://%s" % os.environ["SERVER"]
121 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
122 (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
125 '--color=no', '-S', '-r')
126 self.assertCmdSuccess(result, out, err)
129 server = "ldap://%s" % os.environ["SERVER"]
130 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
131 (result, out, err) = self.runsubcmd("visualize", "reps",
135 self.assertCmdSuccess(result, out, err)
137 def test_uptodateness_all_partitions(self):
138 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
139 dc1 = os.environ["SERVER"]
140 dc2 = os.environ["DC_SERVER"]
141 # We will check that the visualisation works for the two
142 # stopped DCs, but we can't make assertions that the output
143 # will be the same because there may be replication between
144 # the two calls. Stopping the replication on these ones is not
145 # enough because there are other DCs about.
146 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
148 '-H', "ldap://%s" % dc1,
151 self.assertCmdSuccess(result, out, err)
153 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
155 '-H', "ldap://%s" % dc2,
158 self.assertCmdSuccess(result, out, err)
160 def test_uptodateness_partitions(self):
161 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
162 dc1 = os.environ["SERVER"]
163 for part in PARTITION_NAMES:
164 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
166 '-H', "ldap://%s" % dc1,
170 self.assertCmdSuccess(result, out, err)
172 def test_drs_uptodateness(self):
174 Test cmd `drs uptodateness`
176 It should print info like this:
178 DNSDOMAIN failure: 4 median: 1.5 maximum: 2
179 SCHEMA failure: 4 median: 220.0 maximum: 439
180 DOMAIN failure: 1 median: 25 maximum: 25
181 CONFIGURATION failure: 1 median: 25 maximum: 25
182 DNSFOREST failure: 4 median: 1.5 maximum: 2
185 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
186 dc1 = os.environ["SERVER"]
187 dc2 = os.environ["DC_SERVER"]
188 for dc in [dc1, dc2]:
189 (result, out, err) = self.runsubcmd("drs", "uptodateness",
190 '-H', "ldap://%s" % dc,
192 self.assertCmdSuccess(result, out, err)
193 # each partition name should be in output
194 for part_name in PARTITION_NAMES:
195 self.assertIn(part_name, out, msg=out)
197 for line in out.splitlines():
198 # check keyword in output
199 for attr in ['maximum', 'median', 'failure']:
200 self.assertIn(attr, line)
202 def test_drs_uptodateness_partition(self):
204 Test cmd `drs uptodateness --partition DOMAIN`
206 It should print info like this:
208 DOMAIN failure: 1 median: 25 maximum: 25
211 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
212 dc1 = os.environ["SERVER"]
213 dc2 = os.environ["DC_SERVER"]
214 for dc in [dc1, dc2]:
215 (result, out, err) = self.runsubcmd("drs", "uptodateness",
216 '-H', "ldap://%s" % dc,
218 '--partition', 'DOMAIN')
219 self.assertCmdSuccess(result, out, err)
221 lines = out.splitlines()
222 self.assertEquals(len(lines), 1)
225 self.assertTrue(line.startswith('DOMAIN'))
227 def test_drs_uptodateness_json(self):
229 Test cmd `drs uptodateness --json`
247 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
248 dc1 = os.environ["SERVER"]
249 dc2 = os.environ["DC_SERVER"]
250 for dc in [dc1, dc2]:
251 (result, out, err) = self.runsubcmd("drs", "uptodateness",
252 '-H', "ldap://%s" % dc,
255 self.assertCmdSuccess(result, out, err)
256 # should be json format
257 obj = json.loads(out)
258 # each partition name should be in json obj
259 for part_name in PARTITION_NAMES:
260 self.assertIn(part_name, obj)
261 summary_obj = obj[part_name]
262 for attr in ['maximum', 'median', 'failure']:
263 self.assertIn(attr, summary_obj)
265 def test_drs_uptodateness_json_median(self):
267 Test cmd `drs uptodateness --json --median`
269 drs uptodateness --json --median
281 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
282 dc1 = os.environ["SERVER"]
283 dc2 = os.environ["DC_SERVER"]
284 for dc in [dc1, dc2]:
285 (result, out, err) = self.runsubcmd("drs", "uptodateness",
286 '-H', "ldap://%s" % dc,
288 '--json', '--median')
289 self.assertCmdSuccess(result, out, err)
290 # should be json format
291 obj = json.loads(out)
292 # each partition name should be in json obj
293 for part_name in PARTITION_NAMES:
294 self.assertIn(part_name, obj)
295 summary_obj = obj[part_name]
296 self.assertIn('median', summary_obj)
297 self.assertNotIn('maximum', summary_obj)
298 self.assertNotIn('failure', summary_obj)
300 def assert_matrix_validity(self, matrix, dcs=()):
302 self.assertIn(dc, matrix)
303 for k, row in matrix.items():
304 self.assertEqual(row[k], 0)
306 def test_uptodateness_stop_replication_domain(self):
307 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
308 dc1 = os.environ["SERVER"]
309 dc2 = os.environ["DC_SERVER"]
310 self.addCleanup(set_auto_replication, dc1, True)
311 self.addCleanup(set_auto_replication, dc2, True)
313 def display(heading, out):
315 print("========", heading, "=========")
318 samdb1 = self.getSamDB("-H", "ldap://%s" % dc1, "-U", creds)
319 samdb2 = self.getSamDB("-H", "ldap://%s" % dc2, "-U", creds)
321 domain_dn = samdb1.domain_dn()
322 self.assertTrue(domain_dn == samdb2.domain_dn(),
323 "We expected the same domain_dn across DCs")
325 ou1 = "OU=dc1.%x,%s" % (random.randrange(1 << 64), domain_dn)
326 ou2 = "OU=dc2.%x,%s" % (random.randrange(1 << 64), domain_dn)
329 "objectclass": "organizationalUnit"
333 "objectclass": "organizationalUnit"
336 set_auto_replication(dc1, False)
337 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
339 '-H', "ldap://%s" % dc1,
343 '--partition', 'DOMAIN')
344 display("dc1 replication is now off", out)
345 self.assertCmdSuccess(result, out, err)
346 matrix = get_utf8_matrix(out)
347 self.assert_matrix_validity(matrix, [dc1, dc2])
349 force_replication(dc2, dc1, domain_dn)
350 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
352 '-H', "ldap://%s" % dc1,
356 '--partition', 'DOMAIN')
357 display("forced replication %s -> %s" % (dc2, dc1), out)
358 self.assertCmdSuccess(result, out, err)
359 matrix = get_utf8_matrix(out)
360 self.assert_matrix_validity(matrix, [dc1, dc2])
361 self.assertEqual(matrix[dc1][dc2], 0)
363 force_replication(dc1, dc2, domain_dn)
364 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
366 '-H', "ldap://%s" % dc1,
370 '--partition', 'DOMAIN')
371 display("forced replication %s -> %s" % (dc2, dc1), out)
372 self.assertCmdSuccess(result, out, err)
373 matrix = get_utf8_matrix(out)
374 self.assert_matrix_validity(matrix, [dc1, dc2])
375 self.assertEqual(matrix[dc2][dc1], 0)
377 dn1 = 'cn=u1.%%d,%s' % (ou1)
378 dn2 = 'cn=u2.%%d,%s' % (ou2)
383 "objectclass": "user"
386 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
388 '-H', "ldap://%s" % dc1,
392 '--partition', 'DOMAIN')
393 display("added 10 users on %s" % dc1, out)
394 self.assertCmdSuccess(result, out, err)
395 matrix = get_utf8_matrix(out)
396 self.assert_matrix_validity(matrix, [dc1, dc2])
397 # dc2's view of dc1 should now be 10 changes out of date
398 self.assertEqual(matrix[dc2][dc1], 10)
403 "objectclass": "user"
406 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
408 '-H', "ldap://%s" % dc1,
412 '--partition', 'DOMAIN')
413 display("added 10 users on %s" % dc2, out)
414 self.assertCmdSuccess(result, out, err)
415 matrix = get_utf8_matrix(out)
416 self.assert_matrix_validity(matrix, [dc1, dc2])
417 # dc1's view of dc2 is probably 11 changes out of date
418 self.assertGreaterEqual(matrix[dc1][dc2], 10)
420 for i in range(10, 101):
423 "objectclass": "user"
427 "objectclass": "user"
430 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
432 '-H', "ldap://%s" % dc1,
436 '--partition', 'DOMAIN')
437 display("added 91 users on both", out)
438 self.assertCmdSuccess(result, out, err)
439 matrix = get_utf8_matrix(out)
440 self.assert_matrix_validity(matrix, [dc1, dc2])
441 # the difference here should be ~101.
442 self.assertGreaterEqual(matrix[dc1][dc2], 100)
443 self.assertGreaterEqual(matrix[dc2][dc1], 100)
445 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
447 '-H', "ldap://%s" % dc1,
451 '--partition', 'DOMAIN',
453 display("with --max-digits 2", out)
454 self.assertCmdSuccess(result, out, err)
455 matrix = get_utf8_matrix(out)
456 self.assert_matrix_validity(matrix, [dc1, dc2])
457 # visualising with 2 digits mean these overflow into infinity
458 self.assertGreaterEqual(matrix[dc1][dc2], 1e99)
459 self.assertGreaterEqual(matrix[dc2][dc1], 1e99)
461 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
463 '-H', "ldap://%s" % dc1,
467 '--partition', 'DOMAIN',
469 display("with --max-digits 1", out)
470 self.assertCmdSuccess(result, out, err)
471 matrix = get_utf8_matrix(out)
472 self.assert_matrix_validity(matrix, [dc1, dc2])
473 # visualising with 1 digit means these overflow into infinity
474 self.assertGreaterEqual(matrix[dc1][dc2], 1e99)
475 self.assertGreaterEqual(matrix[dc2][dc1], 1e99)
477 force_replication(dc2, dc1, samdb1.domain_dn())
478 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
480 '-H', "ldap://%s" % dc1,
484 '--partition', 'DOMAIN')
486 display("forced replication %s -> %s" % (dc2, dc1), out)
487 self.assertCmdSuccess(result, out, err)
488 matrix = get_utf8_matrix(out)
489 self.assert_matrix_validity(matrix, [dc1, dc2])
490 self.assertEqual(matrix[dc1][dc2], 0)
492 force_replication(dc1, dc2, samdb2.domain_dn())
493 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
495 '-H', "ldap://%s" % dc1,
499 '--partition', 'DOMAIN')
501 display("forced replication %s -> %s" % (dc1, dc2), out)
502 self.assertCmdSuccess(result, out, err)
503 matrix = get_utf8_matrix(out)
504 self.assert_matrix_validity(matrix, [dc1, dc2])
505 self.assertEqual(matrix[dc2][dc1], 0)
507 samdb1.delete(ou1, ['tree_delete:1'])
508 samdb2.delete(ou2, ['tree_delete:1'])
510 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
512 '-H', "ldap://%s" % dc1,
516 '--partition', 'DOMAIN')
517 display("tree delete both ous on %s" % (dc1,), out)
518 self.assertCmdSuccess(result, out, err)
519 matrix = get_utf8_matrix(out)
520 self.assert_matrix_validity(matrix, [dc1, dc2])
521 self.assertGreaterEqual(matrix[dc1][dc2], 100)
522 self.assertGreaterEqual(matrix[dc2][dc1], 100)
524 set_auto_replication(dc1, True)
525 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
527 '-H', "ldap://%s" % dc1,
531 '--partition', 'DOMAIN')
532 display("replication is now on", out)
533 self.assertCmdSuccess(result, out, err)
534 matrix = get_utf8_matrix(out)
535 self.assert_matrix_validity(matrix, [dc1, dc2])
536 # We can't assert actual values after this because
537 # auto-replication is on and things will change underneath us.
539 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
541 '-H', "ldap://%s" % dc2,
545 '--partition', 'DOMAIN')
547 display("%s's view" % dc2, out)
548 self.assertCmdSuccess(result, out, err)
549 matrix = get_utf8_matrix(out)
550 self.assert_matrix_validity(matrix, [dc1, dc2])
552 force_replication(dc1, dc2, samdb2.domain_dn())
553 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
555 '-H', "ldap://%s" % dc1,
559 '--partition', 'DOMAIN')
561 display("forced replication %s -> %s" % (dc1, dc2), out)
562 self.assertCmdSuccess(result, out, err)
563 matrix = get_utf8_matrix(out)
564 self.assert_matrix_validity(matrix, [dc1, dc2])
566 force_replication(dc2, dc1, samdb2.domain_dn())
567 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
569 '-H', "ldap://%s" % dc1,
573 '--partition', 'DOMAIN')
574 display("forced replication %s -> %s" % (dc2, dc1), out)
575 self.assertCmdSuccess(result, out, err)
576 matrix = get_utf8_matrix(out)
577 self.assert_matrix_validity(matrix, [dc1, dc2])
579 (result, out, err) = self.runsubcmd("visualize", "uptodateness",
581 '-H', "ldap://%s" % dc2,
585 '--partition', 'DOMAIN')
586 display("%s's view" % dc2, out)
588 self.assertCmdSuccess(result, out, err)
589 matrix = get_utf8_matrix(out)
590 self.assert_matrix_validity(matrix, [dc1, dc2])
592 def test_reps_remote(self):
593 server = "ldap://%s" % os.environ["SERVER"]
594 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
595 (result, out, err) = self.runsubcmd("visualize", "reps",
598 '--color=no', '-S', '-r')
599 self.assertCmdSuccess(result, out, err)
601 def test_ntdsconn_dot(self):
602 server = "ldap://%s" % os.environ["SERVER"]
603 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
604 (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
606 '-U', creds, '--dot',
608 self.assertCmdSuccess(result, out, err)
610 def test_ntdsconn_remote_dot(self):
611 server = "ldap://%s" % os.environ["SERVER"]
612 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
613 (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
615 '-U', creds, '--dot',
616 '--color=no', '-S', '-r')
617 self.assertCmdSuccess(result, out, err)
619 def test_reps_dot(self):
620 server = "ldap://%s" % os.environ["SERVER"]
621 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
622 (result, out, err) = self.runsubcmd("visualize", "reps",
624 '-U', creds, '--dot',
626 self.assertCmdSuccess(result, out, err)
628 def test_reps_remote_dot(self):
629 server = "ldap://%s" % os.environ["SERVER"]
630 creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
631 (result, out, err) = self.runsubcmd("visualize", "reps",
633 '-U', creds, '--dot',
634 '--color=no', '-S', '-r')
635 self.assertCmdSuccess(result, out, err)