PEP8: fix E401: multiple imports on one line
[samba.git] / python / samba / tests / samba_tool / visualize_drs.py
1 # -*- coding: utf-8 -*-
2 # Originally based on tests for samba.kcc.ldif_import_export.
3 # Copyright (C) Andrew Bartlett 2015, 2018
4 #
5 # by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 """Tests for samba-tool visualize using the vampire DC and promoted DC
21 environments. For most tests we assume we can't assert much about what
22 state they are in, so we mainly check for command failure, but for
23 others we try to grasp control of replication and make more specific
24 assertions.
25 """
26
27 from __future__ import print_function
28 import os
29 import re
30 import random
31 import subprocess
32 from samba.tests.samba_tool.base import SambaToolCmdTest
33
34 VERBOSE = False
35
36 ENV_DSAS = {
37     'promoted_dc': ['CN=PROMOTEDVDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com',
38                     'CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
39     'vampire_dc': ['CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com',
40                    'CN=LOCALVAMPIREDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
41 }
42
43
44 def set_auto_replication(dc, allow):
45     credstring = '-U%s%%%s' % (os.environ["USERNAME"], os.environ["PASSWORD"])
46     on_or_off = '-' if allow else '+'
47
48     for opt in ['DISABLE_INBOUND_REPL',
49                 'DISABLE_OUTBOUND_REPL']:
50         cmd = ['bin/samba-tool',
51                'drs', 'options',
52                credstring, dc,
53                "--dsa-option=%s%s" % (on_or_off, opt)]
54
55         subprocess.check_call(cmd)
56
57
58 def force_replication(src, dest, base):
59     credstring = '-U%s%%%s' % (os.environ["USERNAME"], os.environ["PASSWORD"])
60     cmd = ['bin/samba-tool',
61            'drs', 'replicate',
62            dest, src, base,
63            credstring,
64            '--sync-forced']
65
66     subprocess.check_call(cmd)
67
68
69 def get_utf8_matrix(s):
70     # parse the graphical table *just* well enough for our tests
71     # decolourise first
72     s = re.sub("\033" r"\[[^m]+m", '', s)
73     lines = s.split('\n')
74     # matrix rows have '·' on the diagonal
75     rows = [x.strip().replace('·', '0') for x in lines if '·' in x]
76     names = []
77     values = []
78     for r in rows:
79         parts = r.rsplit(None, len(rows))
80         k, v = parts[0], parts[1:]
81         # we want the FOO in 'CN=FOO+' or 'CN=FOO,CN=x,DC=...'
82         k = re.match(r'cn=([^+,]+)', k.lower()).group(1)
83         names.append(k)
84         if len(v) == 1:  # this is a single-digit matrix, no spaces
85             v = list(v[0])
86         values.append([int(x) if x.isdigit() else 1e999 for x in v])
87
88     d = {}
89     for n1, row in zip(names, values):
90         d[n1] = {}
91         for n2, v in zip(names, row):
92             d[n1][n2] = v
93
94     return d
95
96
97 class SambaToolVisualizeDrsTest(SambaToolCmdTest):
98     def setUp(self):
99         super(SambaToolVisualizeDrsTest, self).setUp()
100
101     def test_ntdsconn(self):
102         server = "ldap://%s" % os.environ["SERVER"]
103         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
104         (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
105                                             '-H', server,
106                                             '-U', creds,
107                                             '--color=no', '-S')
108         self.assertCmdSuccess(result, out, err)
109
110     def test_ntdsconn_remote(self):
111         server = "ldap://%s" % os.environ["SERVER"]
112         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
113         (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
114                                             '-H', server,
115                                             '-U', creds,
116                                             '--color=no', '-S', '-r')
117         self.assertCmdSuccess(result, out, err)
118
119     def test_reps(self):
120         server = "ldap://%s" % os.environ["SERVER"]
121         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
122         (result, out, err) = self.runsubcmd("visualize", "reps",
123                                             '-H', server,
124                                             '-U', creds,
125                                             '--color=no', '-S')
126         self.assertCmdSuccess(result, out, err)
127
128     def test_uptodateness_all_partitions(self):
129         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
130         dc1 = os.environ["SERVER"]
131         dc2 = os.environ["DC_SERVER"]
132         # We will check that the visualisation works for the two
133         # stopped DCs, but we can't make assertions that the output
134         # will be the same because there may be replication between
135         # the two calls. Stopping the replication on these ones is not
136         # enough because there are other DCs about.
137         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
138                                             "-r",
139                                             '-H', "ldap://%s" % dc1,
140                                             '-U', creds,
141                                             '--color=no', '-S')
142         self.assertCmdSuccess(result, out, err)
143
144         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
145                                             "-r",
146                                             '-H', "ldap://%s" % dc2,
147                                             '-U', creds,
148                                             '--color=no', '-S')
149         self.assertCmdSuccess(result, out, err)
150
151     def test_uptodateness_partitions(self):
152         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
153         dc1 = os.environ["SERVER"]
154         for part in ["CONFIGURATION",
155                      "SCHEMA",
156                      "DNSDOMAIN",
157                      "DNSFOREST"]:
158
159             (result, out, err) = self.runsubcmd("visualize", "uptodateness",
160                                                 "-r",
161                                                 '-H', "ldap://%s" % dc1,
162                                                 '-U', creds,
163                                                 '--color=no', '-S',
164                                                 '--partition', part)
165             self.assertCmdSuccess(result, out, err)
166
167     def assert_matrix_validity(self, matrix, dcs=()):
168         for dc in dcs:
169             self.assertIn(dc, matrix)
170         for k, row in matrix.items():
171             self.assertEqual(row[k], 0)
172
173     def test_uptodateness_stop_replication_domain(self):
174         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
175         dc1 = os.environ["SERVER"]
176         dc2 = os.environ["DC_SERVER"]
177         self.addCleanup(set_auto_replication, dc1, True)
178         self.addCleanup(set_auto_replication, dc2, True)
179
180         def display(heading, out):
181             if VERBOSE:
182                 print("========", heading, "=========")
183                 print(out)
184
185         samdb1 = self.getSamDB("-H", "ldap://%s" % dc1, "-U", creds)
186         samdb2 = self.getSamDB("-H", "ldap://%s" % dc2, "-U", creds)
187
188         domain_dn = samdb1.domain_dn()
189         self.assertTrue(domain_dn == samdb2.domain_dn(),
190                         "We expected the same domain_dn across DCs")
191
192         ou1 = "OU=dc1.%x,%s" % (random.randrange(1 << 64), domain_dn)
193         ou2 = "OU=dc2.%x,%s" % (random.randrange(1 << 64), domain_dn)
194         samdb1.add({
195             "dn": ou1,
196             "objectclass": "organizationalUnit"
197         })
198         samdb2.add({
199             "dn": ou2,
200             "objectclass": "organizationalUnit"
201         })
202
203         set_auto_replication(dc1, False)
204         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
205                                             "-r",
206                                             '-H', "ldap://%s" % dc1,
207                                             '-U', creds,
208                                             '--color=yes',
209                                             '--utf8', '-S',
210                                             '--partition', 'DOMAIN')
211         display("dc1 replication is now off", out)
212         self.assertCmdSuccess(result, out, err)
213         matrix = get_utf8_matrix(out)
214         self.assert_matrix_validity(matrix, [dc1, dc2])
215
216         force_replication(dc2, dc1, domain_dn)
217         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
218                                             "-r",
219                                             '-H', "ldap://%s" % dc1,
220                                             '-U', creds,
221                                             '--color=yes',
222                                             '--utf8', '-S',
223                                             '--partition', 'DOMAIN')
224         display("forced replication %s -> %s" % (dc2, dc1), out)
225         self.assertCmdSuccess(result, out, err)
226         matrix = get_utf8_matrix(out)
227         self.assert_matrix_validity(matrix, [dc1, dc2])
228         self.assertEqual(matrix[dc1][dc2], 0)
229
230         force_replication(dc1, dc2, domain_dn)
231         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
232                                             "-r",
233                                             '-H', "ldap://%s" % dc1,
234                                             '-U', creds,
235                                             '--color=yes',
236                                             '--utf8', '-S',
237                                             '--partition', 'DOMAIN')
238         display("forced replication %s -> %s" % (dc2, dc1), out)
239         self.assertCmdSuccess(result, out, err)
240         matrix = get_utf8_matrix(out)
241         self.assert_matrix_validity(matrix, [dc1, dc2])
242         self.assertEqual(matrix[dc2][dc1], 0)
243
244         dn1 = 'cn=u1.%%d,%s' % (ou1)
245         dn2 = 'cn=u2.%%d,%s' % (ou2)
246
247         for i in range(10):
248             samdb1.add({
249                 "dn": dn1 % i,
250                 "objectclass": "user"
251             })
252
253         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
254                                             "-r",
255                                             '-H', "ldap://%s" % dc1,
256                                             '-U', creds,
257                                             '--color=yes',
258                                             '--utf8', '-S',
259                                             '--partition', 'DOMAIN')
260         display("added 10 users on %s" % dc1, out)
261         self.assertCmdSuccess(result, out, err)
262         matrix = get_utf8_matrix(out)
263         self.assert_matrix_validity(matrix, [dc1, dc2])
264         # dc2's view of dc1 should now be 10 changes out of date
265         self.assertEqual(matrix[dc2][dc1], 10)
266
267         for i in range(10):
268             samdb2.add({
269                 "dn": dn2 % i,
270                 "objectclass": "user"
271             })
272
273         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
274                                             "-r",
275                                             '-H', "ldap://%s" % dc1,
276                                             '-U', creds,
277                                             '--color=yes',
278                                             '--utf8', '-S',
279                                             '--partition', 'DOMAIN')
280         display("added 10 users on %s" % dc2, out)
281         self.assertCmdSuccess(result, out, err)
282         matrix = get_utf8_matrix(out)
283         self.assert_matrix_validity(matrix, [dc1, dc2])
284         # dc1's view of dc2 is probably 11 changes out of date
285         self.assertGreaterEqual(matrix[dc1][dc2], 10)
286
287         for i in range(10, 101):
288             samdb1.add({
289                 "dn": dn1 % i,
290                 "objectclass": "user"
291             })
292             samdb2.add({
293                 "dn": dn2 % i,
294                 "objectclass": "user"
295             })
296
297         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
298                                             "-r",
299                                             '-H', "ldap://%s" % dc1,
300                                             '-U', creds,
301                                             '--color=yes',
302                                             '--utf8', '-S',
303                                             '--partition', 'DOMAIN')
304         display("added 91 users on both", out)
305         self.assertCmdSuccess(result, out, err)
306         matrix = get_utf8_matrix(out)
307         self.assert_matrix_validity(matrix, [dc1, dc2])
308         # the difference here should be ~101.
309         self.assertGreaterEqual(matrix[dc1][dc2], 100)
310         self.assertGreaterEqual(matrix[dc2][dc1], 100)
311
312         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
313                                             "-r",
314                                             '-H', "ldap://%s" % dc1,
315                                             '-U', creds,
316                                             '--color=yes',
317                                             '--utf8', '-S',
318                                             '--partition', 'DOMAIN',
319                                             '--max-digits', '2')
320         display("with --max-digits 2", out)
321         self.assertCmdSuccess(result, out, err)
322         matrix = get_utf8_matrix(out)
323         self.assert_matrix_validity(matrix, [dc1, dc2])
324         # visualising with 2 digits mean these overflow into infinity
325         self.assertGreaterEqual(matrix[dc1][dc2], 1e99)
326         self.assertGreaterEqual(matrix[dc2][dc1], 1e99)
327
328         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
329                                             "-r",
330                                             '-H', "ldap://%s" % dc1,
331                                             '-U', creds,
332                                             '--color=yes',
333                                             '--utf8', '-S',
334                                             '--partition', 'DOMAIN',
335                                             '--max-digits', '1')
336         display("with --max-digits 1", out)
337         self.assertCmdSuccess(result, out, err)
338         matrix = get_utf8_matrix(out)
339         self.assert_matrix_validity(matrix, [dc1, dc2])
340         # visualising with 1 digit means these overflow into infinity
341         self.assertGreaterEqual(matrix[dc1][dc2], 1e99)
342         self.assertGreaterEqual(matrix[dc2][dc1], 1e99)
343
344         force_replication(dc2, dc1, samdb1.domain_dn())
345         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
346                                             "-r",
347                                             '-H', "ldap://%s" % dc1,
348                                             '-U', creds,
349                                             '--color=yes',
350                                             '--utf8', '-S',
351                                             '--partition', 'DOMAIN')
352
353         display("forced replication %s -> %s" % (dc2, dc1), out)
354         self.assertCmdSuccess(result, out, err)
355         matrix = get_utf8_matrix(out)
356         self.assert_matrix_validity(matrix, [dc1, dc2])
357         self.assertEqual(matrix[dc1][dc2], 0)
358
359         force_replication(dc1, dc2, samdb2.domain_dn())
360         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
361                                             "-r",
362                                             '-H', "ldap://%s" % dc1,
363                                             '-U', creds,
364                                             '--color=yes',
365                                             '--utf8', '-S',
366                                             '--partition', 'DOMAIN')
367
368         display("forced replication %s -> %s" % (dc1, dc2), out)
369         self.assertCmdSuccess(result, out, err)
370         matrix = get_utf8_matrix(out)
371         self.assert_matrix_validity(matrix, [dc1, dc2])
372         self.assertEqual(matrix[dc2][dc1], 0)
373
374         samdb1.delete(ou1, ['tree_delete:1'])
375         samdb2.delete(ou2, ['tree_delete:1'])
376
377         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
378                                             "-r",
379                                             '-H', "ldap://%s" % dc1,
380                                             '-U', creds,
381                                             '--color=yes',
382                                             '--utf8', '-S',
383                                             '--partition', 'DOMAIN')
384         display("tree delete both ous on %s" % (dc1,), out)
385         self.assertCmdSuccess(result, out, err)
386         matrix = get_utf8_matrix(out)
387         self.assert_matrix_validity(matrix, [dc1, dc2])
388         self.assertGreaterEqual(matrix[dc1][dc2], 100)
389         self.assertGreaterEqual(matrix[dc2][dc1], 100)
390
391         set_auto_replication(dc1, True)
392         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
393                                             "-r",
394                                             '-H', "ldap://%s" % dc1,
395                                             '-U', creds,
396                                             '--color=yes',
397                                             '--utf8', '-S',
398                                             '--partition', 'DOMAIN')
399         display("replication is now on", out)
400         self.assertCmdSuccess(result, out, err)
401         matrix = get_utf8_matrix(out)
402         self.assert_matrix_validity(matrix, [dc1, dc2])
403         # We can't assert actual values after this because
404         # auto-replication is on and things will change underneath us.
405
406         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
407                                             "-r",
408                                             '-H', "ldap://%s" % dc2,
409                                             '-U', creds,
410                                             '--color=yes',
411                                             '--utf8', '-S',
412                                             '--partition', 'DOMAIN')
413
414         display("%s's view" % dc2, out)
415         self.assertCmdSuccess(result, out, err)
416         matrix = get_utf8_matrix(out)
417         self.assert_matrix_validity(matrix, [dc1, dc2])
418
419         force_replication(dc1, dc2, samdb2.domain_dn())
420         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
421                                             "-r",
422                                             '-H', "ldap://%s" % dc1,
423                                             '-U', creds,
424                                             '--color=yes',
425                                             '--utf8', '-S',
426                                             '--partition', 'DOMAIN')
427
428         display("forced replication %s -> %s" % (dc1, dc2), out)
429         self.assertCmdSuccess(result, out, err)
430         matrix = get_utf8_matrix(out)
431         self.assert_matrix_validity(matrix, [dc1, dc2])
432
433         force_replication(dc2, dc1, samdb2.domain_dn())
434         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
435                                             "-r",
436                                             '-H', "ldap://%s" % dc1,
437                                             '-U', creds,
438                                             '--color=yes',
439                                             '--utf8', '-S',
440                                             '--partition', 'DOMAIN')
441         display("forced replication %s -> %s" % (dc2, dc1), out)
442         self.assertCmdSuccess(result, out, err)
443         matrix = get_utf8_matrix(out)
444         self.assert_matrix_validity(matrix, [dc1, dc2])
445
446         (result, out, err) = self.runsubcmd("visualize", "uptodateness",
447                                             "-r",
448                                             '-H', "ldap://%s" % dc2,
449                                             '-U', creds,
450                                             '--color=yes',
451                                             '--utf8', '-S',
452                                             '--partition', 'DOMAIN')
453         display("%s's view" % dc2, out)
454
455         self.assertCmdSuccess(result, out, err)
456         matrix = get_utf8_matrix(out)
457         self.assert_matrix_validity(matrix, [dc1, dc2])
458
459     def test_reps_remote(self):
460         server = "ldap://%s" % os.environ["SERVER"]
461         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
462         (result, out, err) = self.runsubcmd("visualize", "reps",
463                                             '-H', server,
464                                             '-U', creds,
465                                             '--color=no', '-S', '-r')
466         self.assertCmdSuccess(result, out, err)
467
468     def test_ntdsconn_dot(self):
469         server = "ldap://%s" % os.environ["SERVER"]
470         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
471         (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
472                                             '-H', server,
473                                             '-U', creds, '--dot',
474                                             '--color=no', '-S')
475         self.assertCmdSuccess(result, out, err)
476
477     def test_ntdsconn_remote_dot(self):
478         server = "ldap://%s" % os.environ["SERVER"]
479         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
480         (result, out, err) = self.runsubcmd("visualize", "ntdsconn",
481                                             '-H', server,
482                                             '-U', creds, '--dot',
483                                             '--color=no', '-S', '-r')
484         self.assertCmdSuccess(result, out, err)
485
486     def test_reps_dot(self):
487         server = "ldap://%s" % os.environ["SERVER"]
488         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
489         (result, out, err) = self.runsubcmd("visualize", "reps",
490                                             '-H', server,
491                                             '-U', creds, '--dot',
492                                             '--color=no', '-S')
493         self.assertCmdSuccess(result, out, err)
494
495     def test_reps_remote_dot(self):
496         server = "ldap://%s" % os.environ["SERVER"]
497         creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
498         (result, out, err) = self.runsubcmd("visualize", "reps",
499                                             '-H', server,
500                                             '-U', creds, '--dot',
501                                             '--color=no', '-S', '-r')
502         self.assertCmdSuccess(result, out, err)