27107c1f6a6c7146886608bd095d74c4ac24da8b
[samba.git] / source4 / torture / drs / python / fsmo.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Anatoliy Atanasov <anatoliy.atanasov@postpath.com> 2010
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 #
21 # Usage:
22 #  export DC1=dc1_dns_name
23 #  export DC2=dc2_dns_name
24 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
25 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN fsmo -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
26 #
27
28 import sys
29 import time
30 import os
31
32 sys.path.append("bin/python")
33
34 from samba.auth import system_session
35 from ldb import SCOPE_BASE
36 from samba.samdb import SamDB
37
38 import samba.tests
39
40 class DrsFsmoTestCase(samba.tests.TestCase):
41
42     # RootDSE msg for DC1
43     info_dc1 = None
44     ldb_dc1 = None
45     # RootDSE msg for DC1
46     info_dc2 = None
47     ldb_dc2 = None
48
49     def setUp(self):
50         super(DrsFsmoTestCase, self).setUp()
51
52         # we have to wait for the replication before we make the check
53         self.sleep_time = 5
54         # connect to DCs singleton
55         if self.ldb_dc1 is None:
56             DrsFsmoTestCase.dc1 = get_env_var("DC1")
57             DrsFsmoTestCase.ldb_dc1 = connect_samdb(self.dc1)
58         if self.ldb_dc2 is None:
59             DrsFsmoTestCase.dc2 = get_env_var("DC2")
60             DrsFsmoTestCase.ldb_dc2 = connect_samdb(self.dc2)
61
62         # fetch rootDSEs
63         if self.info_dc1 is None:
64             ldb = self.ldb_dc1
65             res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
66             self.assertEquals(len(res), 1)
67             DrsFsmoTestCase.info_dc1 = res[0]
68         if self.info_dc2 is None:
69             ldb = self.ldb_dc2
70             res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
71             self.assertEquals(len(res), 1)
72             DrsFsmoTestCase.info_dc2 = res[0]
73
74         # cache some of RootDSE props
75         self.schema_dn = self.info_dc1["schemaNamingContext"][0]
76         self.domain_dn = self.info_dc1["defaultNamingContext"][0]
77         self.config_dn = self.info_dc1["configurationNamingContext"][0]
78         self.dsServiceName_dc1 = self.info_dc1["dsServiceName"][0]
79         self.dsServiceName_dc2 = self.info_dc2["dsServiceName"][0]
80         self.infrastructure_dn = "CN=Infrastructure," + self.domain_dn
81         self.naming_dn = "CN=Partitions," + self.config_dn
82         self.rid_dn = "CN=RID Manager$,CN=System," + self.domain_dn
83
84         # we will need DCs DNS names for 'net fsmo' command
85         self.dnsname_dc1 = self.info_dc1["dnsHostName"][0]
86         self.dnsname_dc2 = self.info_dc2["dnsHostName"][0]
87         pass
88
89     def tearDown(self):
90         super(DrsFsmoTestCase, self).tearDown()
91
92     def _net_fsmo_role_transfer(self, DC, role):
93         # find out where is net command
94         net_cmd = os.path.abspath("./bin/net")
95         # make command line credentials string
96         creds = samba.tests.cmdline_credentials
97         cmd_line_auth = "-U%s/%s%%%s" % (creds.get_domain(),
98                                          creds.get_username(), creds.get_password())
99         # bin/net fsmo transfer --role=role --host=ldap://DC:389
100         cmd_line = "%s fsmo transfer --role=%s --host=ldap://%s:389 %s" % (net_cmd, role, DC,
101                                                                            cmd_line_auth)
102         ret = os.system(cmd_line)
103         self.assertEquals(ret, 0, "Transfering schema to %s has failed!" % (DC))
104         pass
105
106     def _role_transfer(self, role, role_dn):
107         """Triggers transfer of role from DC1 to DC2
108            and vice versa so the role goes back to the original dc"""
109         # dc2 gets the schema master role from dc1
110         print "Testing for %s role transfer from %s to %s" % (role, self.dnsname_dc1, self.dnsname_dc2)
111
112         self._net_fsmo_role_transfer(DC=self.dnsname_dc2, role=role)
113         # check if the role is transfered, but wait a little first so the getncchanges can pass
114         time.sleep(self.sleep_time)
115         res = self.ldb_dc2.search(role_dn,
116                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
117         assert len(res) == 1
118         self.master = res[0]["fSMORoleOwner"][0]
119         self.assertEquals(self.master, self.dsServiceName_dc2,
120                           "Transfering %s role to %s has failed, master is: %s!"%(role, self.dsServiceName_dc2,self.master))
121
122         # dc1 gets back the schema master role from dc2
123         print "Testing for %s role transfer from %s to %s" % (role, self.dnsname_dc2, self.dnsname_dc1)
124         self._net_fsmo_role_transfer(DC=self.dnsname_dc1, role=role);
125         # check if the role is transfered
126         time.sleep(self.sleep_time)
127         res = self.ldb_dc1.search(role_dn,
128                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
129         assert len(res) == 1
130         self.master = res[0]["fSMORoleOwner"][0]
131         self.assertEquals(self.master, self.dsServiceName_dc1,
132                           "Transfering %s role to %s has failed, master is %s"%(role, self.dsServiceName_dc1, self.master))
133         pass
134
135     def test_SchemaMasterTransfer(self):
136         self._role_transfer(role="schema", role_dn=self.schema_dn)
137         pass
138
139     def test_InfrastructureMasterTransfer(self):
140         self._role_transfer(role="infrastructure", role_dn=self.infrastructure_dn)
141         pass
142
143     def test_PDCMasterTransfer(self):
144         self._role_transfer(role="pdc", role_dn=self.domain_dn)
145         pass
146
147     def test_RIDMasterTransfer(self):
148         self._role_transfer(role="rid", role_dn=self.rid_dn)
149         pass
150
151     def test_NamingMasterTransfer(self):
152         self._role_transfer(role="naming", role_dn=self.naming_dn)
153         pass
154
155
156 ########################################################################################
157 def get_env_var(var_name):
158     if not var_name in os.environ.keys():
159         raise AssertionError("Please supply %s in environment" % var_name)
160     return os.environ[var_name]
161
162 def connect_samdb(samdb_url):
163     ldb_options = []
164     if not "://" in samdb_url:
165         if os.path.isfile(samdb_url):
166             samdb_url = "tdb://%s" % samdb_url
167         else:
168             samdb_url = "ldap://%s:389" % samdb_url
169     # use 'paged_search' module when connecting remotely
170     if samdb_url.lower().startswith("ldap://"):
171         ldb_options = ["modules:paged_searches"]
172
173     return SamDB(url=samdb_url,
174                  lp=samba.tests.env_loadparm(),
175                  session_info=system_session(),
176                  credentials=samba.tests.cmdline_credentials,
177                  options=ldb_options)
178
179
180