import time
import os
-sys.path.append("bin/python")
+sys.path.insert(0, "bin/python")
-from samba.auth import system_session
from ldb import SCOPE_BASE
-from samba.samdb import SamDB
-import samba.tests
+import drs_base
-class DrsFsmoTestCase(samba.tests.TestCase):
-
- # RootDSE msg for DC1
- info_dc1 = None
- ldb_dc1 = None
- # RootDSE msg for DC1
- info_dc2 = None
- ldb_dc2 = None
+class DrsFsmoTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsFsmoTestCase, self).setUp()
# we have to wait for the replication before we make the check
- self.sleep_time = 5
- # connect to DCs singleton
- if self.ldb_dc1 is None:
- DrsFsmoTestCase.dc1 = get_env_var("DC1")
- DrsFsmoTestCase.ldb_dc1 = connect_samdb(self.dc1)
- if self.ldb_dc2 is None:
- DrsFsmoTestCase.dc2 = get_env_var("DC2")
- DrsFsmoTestCase.ldb_dc2 = connect_samdb(self.dc2)
-
- # fetch rootDSEs
- if self.info_dc1 is None:
- ldb = self.ldb_dc1
- res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
- self.assertEquals(len(res), 1)
- DrsFsmoTestCase.info_dc1 = res[0]
- if self.info_dc2 is None:
- ldb = self.ldb_dc2
- res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
- self.assertEquals(len(res), 1)
- DrsFsmoTestCase.info_dc2 = res[0]
+ self.fsmo_wait_max_time = 20
+ self.fsmo_wait_sleep_time = 0.2
# cache some of RootDSE props
- self.schema_dn = self.info_dc1["schemaNamingContext"][0]
- self.domain_dn = self.info_dc1["defaultNamingContext"][0]
- self.config_dn = self.info_dc1["configurationNamingContext"][0]
self.dsServiceName_dc1 = self.info_dc1["dsServiceName"][0]
self.dsServiceName_dc2 = self.info_dc2["dsServiceName"][0]
self.infrastructure_dn = "CN=Infrastructure," + self.domain_dn
self.naming_dn = "CN=Partitions," + self.config_dn
self.rid_dn = "CN=RID Manager$,CN=System," + self.domain_dn
- # we will need DCs DNS names for 'net fsmo' command
- self.dnsname_dc1 = self.info_dc1["dnsHostName"][0]
- self.dnsname_dc2 = self.info_dc2["dnsHostName"][0]
- pass
-
def tearDown(self):
super(DrsFsmoTestCase, self).tearDown()
- def _net_fsmo_role_transfer(self, DC, role):
- # find out where is net command
- net_cmd = os.path.abspath("./bin/net")
+ def _net_fsmo_role_transfer(self, DC, role, noop=False):
+ # find out where is samba-tool command
+ net_cmd = os.path.abspath("./bin/samba-tool")
# make command line credentials string
- creds = samba.tests.cmdline_credentials
+ creds = self.get_credentials()
cmd_line_auth = "-U%s/%s%%%s" % (creds.get_domain(),
creds.get_username(), creds.get_password())
- # bin/net fsmo transfer --role=role --host=ldap://DC:389
- cmd_line = "%s fsmo transfer --role=%s --host=ldap://%s:389 %s" % (net_cmd, role, DC,
- cmd_line_auth)
- ret = os.system(cmd_line)
- self.assertEquals(ret, 0, "Transfering schema to %s has failed!" % (DC))
- pass
+ (result, out, err) = self.runsubcmd("fsmo", "transfer",
+ "--role=%s" % role,
+ "-H", "ldap://%s:389" % DC,
+ cmd_line_auth)
+
+ self.assertCmdSuccess(result, out, err)
+ self.assertEquals(err,"","Shouldn't be any error messages")
+ if noop == False:
+ self.assertTrue("FSMO transfer of '%s' role successful" % role in out)
+ else:
+ self.assertTrue("This DC already has the '%s' FSMO role" % role in out)
+
+
+ def _wait_for_role_transfer(self, ldb_dc, role_dn, master):
+ """Wait for role transfer for certain amount of time
+
+ :return: (Result=True|False, CurrentMasterDnsName) tuple
+ """
+ cur_master = ''
+ retries = int(self.fsmo_wait_max_time / self.fsmo_wait_sleep_time) + 1
+ for i in range(0, retries):
+ # check if master has been transfered
+ res = ldb_dc.search(role_dn,
+ scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
+ assert len(res) == 1, "Only one fSMORoleOwner value expected!"
+ cur_master = res[0]["fSMORoleOwner"][0]
+ if master == cur_master:
+ return (True, cur_master)
+ # skip last sleep, if no need to wait anymore
+ if i != (retries - 1):
+ # wait a little bit before next retry
+ time.sleep(self.fsmo_wait_sleep_time)
+ return (False, cur_master)
def _role_transfer(self, role, role_dn):
"""Triggers transfer of role from DC1 to DC2
and vice versa so the role goes back to the original dc"""
- # dc2 gets the schema master role from dc1
+ # dc2 gets the role from dc1
print "Testing for %s role transfer from %s to %s" % (role, self.dnsname_dc1, self.dnsname_dc2)
self._net_fsmo_role_transfer(DC=self.dnsname_dc2, role=role)
- # check if the role is transfered, but wait a little first so the getncchanges can pass
- time.sleep(self.sleep_time)
- res = self.ldb_dc2.search(role_dn,
- scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
- assert len(res) == 1
- self.master = res[0]["fSMORoleOwner"][0]
- self.assertEquals(self.master, self.dsServiceName_dc2,
- "Transfering %s role to %s has failed, master is: %s!"%(role, self.dsServiceName_dc2,self.master))
-
- # dc1 gets back the schema master role from dc2
+ # check if the role is transfered
+ (res, master) = self._wait_for_role_transfer(ldb_dc=self.ldb_dc2,
+ role_dn=role_dn,
+ master=self.dsServiceName_dc2)
+ self.assertTrue(res,
+ "Transferring %s role to %s has failed, master is: %s!"%(role, self.dsServiceName_dc2, master))
+
+ # dc1 gets back the role from dc2
print "Testing for %s role transfer from %s to %s" % (role, self.dnsname_dc2, self.dnsname_dc1)
- self._net_fsmo_role_transfer(DC=self.dnsname_dc1, role=role);
+ self._net_fsmo_role_transfer(DC=self.dnsname_dc1, role=role)
+ # check if the role is transfered
+ (res, master) = self._wait_for_role_transfer(ldb_dc=self.ldb_dc1,
+ role_dn=role_dn,
+ master=self.dsServiceName_dc1)
+ self.assertTrue(res,
+ "Transferring %s role to %s has failed, master is: %s!"%(role, self.dsServiceName_dc1, master))
+
+ # dc1 keeps the role
+ print "Testing for no-op %s role transfer from %s to %s" % (role, self.dnsname_dc2, self.dnsname_dc1)
+ self._net_fsmo_role_transfer(DC=self.dnsname_dc1, role=role, noop=True)
# check if the role is transfered
- time.sleep(self.sleep_time)
- res = self.ldb_dc1.search(role_dn,
- scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
- assert len(res) == 1
- self.master = res[0]["fSMORoleOwner"][0]
- self.assertEquals(self.master, self.dsServiceName_dc1,
- "Transfering %s role to %s has failed, master is %s"%(role, self.dsServiceName_dc1, self.master))
- pass
+ (res, master) = self._wait_for_role_transfer(ldb_dc=self.ldb_dc1,
+ role_dn=role_dn,
+ master=self.dsServiceName_dc1)
+ self.assertTrue(res,
+ "Transferring %s role to %s has failed, master is: %s!"%(role, self.dsServiceName_dc1, master))
def test_SchemaMasterTransfer(self):
self._role_transfer(role="schema", role_dn=self.schema_dn)
- pass
def test_InfrastructureMasterTransfer(self):
self._role_transfer(role="infrastructure", role_dn=self.infrastructure_dn)
- pass
def test_PDCMasterTransfer(self):
self._role_transfer(role="pdc", role_dn=self.domain_dn)
- pass
def test_RIDMasterTransfer(self):
self._role_transfer(role="rid", role_dn=self.rid_dn)
- pass
-
-
-########################################################################################
-def get_env_var(var_name):
- if not var_name in os.environ.keys():
- raise AssertionError("Please supply %s in environment" % var_name)
- return os.environ[var_name]
-
-def connect_samdb(samdb_url):
- ldb_options = []
- if not "://" in samdb_url:
- if os.path.isfile(samdb_url):
- samdb_url = "tdb://%s" % samdb_url
- else:
- samdb_url = "ldap://%s:389" % samdb_url
- # user 'paged_search' module when connecting remotely
- ldb_options = ["modules:paged_searches"]
-
- return SamDB(url=samdb_url,
- lp=samba.tests.env_loadparm(),
- session_info=system_session(),
- credentials=samba.tests.cmdline_credentials,
- options=ldb_options)
-
-
+ def test_NamingMasterTransfer(self):
+ self._role_transfer(role="naming", role_dn=self.naming_dn)