5410ed68b243c887b70f63a505cff5f1f18b9529
[samba.git] / source4 / torture / drs / python / replica_sync.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 #
23 # Usage:
24 #  export DC1=dc1_dns_name
25 #  export DC2=dc2_dns_name
26 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN replica_sync -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
28 #
29
30 import sys
31 import time
32 import random
33 import os
34
35 import drs_base
36 import samba.tests
37
38 from ldb import (
39     ERR_NO_SUCH_OBJECT,
40     LdbError,
41     SCOPE_BASE,
42     Message,
43     FLAG_MOD_ADD,
44     FLAG_MOD_REPLACE,
45     )
46
47
48 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
49     """Intended as a black box test case for DsReplicaSync
50        implementation. It should test the behavior of this
51        case in cases when inbound replication is disabled"""
52
53     def setUp(self):
54         super(DrsReplicaSyncTestCase, self).setUp()
55
56     def tearDown(self):
57         # re-enable replication
58         self._enable_inbound_repl(self.dnsname_dc1)
59         super(DrsReplicaSyncTestCase, self).tearDown()
60
61     def _enable_inbound_repl(self, DC):
62         # make base command line
63         samba_tool_cmd = self._samba_tool_cmdline("options")
64         # disable replication
65         self.check_run("%s %s --dsa-option=-DISABLE_INBOUND_REPL" %(samba_tool_cmd, DC))
66
67     def _disable_inbound_repl(self, DC):
68         # make base command line
69         samba_tool_cmd = self._samba_tool_cmdline("options")
70         # disable replication
71         self.check_run("%s %s --dsa-option=+DISABLE_INBOUND_REPL" %(samba_tool_cmd, DC))
72
73     def test_ReplEnabled(self):
74         """Tests we can replicate when replication is enabled"""
75         self._enable_inbound_repl(self.dnsname_dc1)
76         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False)
77
78     def test_ReplDisabled(self):
79         """Tests we cann't replicate when replication is disabled"""
80         self._disable_inbound_repl(self.dnsname_dc1)
81         try:
82             self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False)
83         except samba.tests.BlackboxProcessError, e:
84             self.assertTrue('WERR_DS_DRA_SINK_DISABLED' in e.stderr)
85         else:
86             self.fail("'drs replicate' command should have failed!")
87
88     def test_ReplDisabledForced(self):
89         """Tests we cann't replicate when replication is disabled"""
90         self._disable_inbound_repl(self.dnsname_dc1)
91         out = self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)