scope=ldb.SCOPE_BASE, attrs=["fSMORoleOwner"])
except LdbError, (num, msg):
if num == ldb.ERR_NO_SUCH_OBJECT:
- return "* The '%s' role is not present in this domain" % role
+ raise CommandError("The '%s' role is not present in this domain" % role)
raise
if 'fSMORoleOwner' in res[0]:
- master_owner = res[0]["fSMORoleOwner"][0]
- return master_owner
+ master_owner = (ldb.Dn(samdb, res[0]["fSMORoleOwner"][0]))
else:
- master_owner = "* The '%s' role does not have an FSMO roleowner" % role
- return master_owner
+ master_owner = None
+
+ return master_owner
def transfer_dns_role(outf, sambaopts, credopts, role, samdb):
naming_dn = "CN=Partitions,%s" % samdb.get_config_basedn()
infrastructure_dn = "CN=Infrastructure," + domain_dn
schema_dn = str(samdb.get_schema_basedn())
- new_owner = samdb.get_dsServiceName()
+ new_owner = ldb.Dn(samdb, samdb.get_dsServiceName())
m = ldb.Message()
m.dn = ldb.Dn(samdb, "")
if role == "rid":
else:
raise CommandError("Invalid FSMO role.")
- if not '*' in master_owner:
- if master_owner != new_owner:
- try:
- samdb.modify(m)
- except LdbError, (num, msg):
- raise CommandError("Transfer of '%s' role failed: %s" %
- (role, msg))
+ if master_owner is None:
+ outf.write("Cannot transfer, no DC assigned to the %s role. Try 'seize' instead\n" % role)
+ return False
- outf.write("FSMO transfer of '%s' role successful\n" % role)
- return True
- else:
- outf.write("This DC already has the '%s' FSMO role\n" % role)
- return False
+ if master_owner != new_owner:
+ try:
+ samdb.modify(m)
+ except LdbError, (num, msg):
+ raise CommandError("Transfer of '%s' role failed: %s" %
+ (role, msg))
+
+ outf.write("FSMO transfer of '%s' role successful\n" % role)
+ return True
else:
- outf.write("%s\n" % master_owner)
+ outf.write("This DC already has the '%s' FSMO role\n" % role)
return False
class cmd_fsmo_seize(Command):
#first try to transfer to avoid problem if the owner is still active
seize = False
master_owner = get_fsmo_roleowner(samdb, m.dn, role)
- if not '*' in master_owner:
+ # if there is a different owner
+ if master_owner is not None:
# if there is a different owner
if master_owner != serviceName:
# if --force isn't given, attempt transfer
#first try to transfer to avoid problem if the owner is still active
seize = False
master_owner = get_fsmo_roleowner(samdb, m.dn, role)
- if not '*' in master_owner:
+ if master_owner is not None:
# if there is a different owner
if master_owner != serviceName:
# if --force isn't given, attempt transfer
domaindns_dn = "CN=Infrastructure,DC=DomainDnsZones," + domain_dn
forestdns_dn = "CN=Infrastructure,DC=ForestDnsZones," + forest_dn
- infrastructureMaster = get_fsmo_roleowner(samdb, infrastructure_dn,
- "infrastructure")
- pdcEmulator = get_fsmo_roleowner(samdb, domain_dn, "pdc")
- namingMaster = get_fsmo_roleowner(samdb, naming_dn, "naming")
- schemaMaster = get_fsmo_roleowner(samdb, schema_dn, "schema")
- ridMaster = get_fsmo_roleowner(samdb, rid_dn, "rid")
- domaindnszonesMaster = get_fsmo_roleowner(samdb, domaindns_dn,
- "domaindns")
- forestdnszonesMaster = get_fsmo_roleowner(samdb, forestdns_dn,
- "forestdns")
-
- self.message("SchemaMasterRole owner: " + schemaMaster)
- self.message("InfrastructureMasterRole owner: " + infrastructureMaster)
- self.message("RidAllocationMasterRole owner: " + ridMaster)
- self.message("PdcEmulationMasterRole owner: " + pdcEmulator)
- self.message("DomainNamingMasterRole owner: " + namingMaster)
- self.message("DomainDnsZonesMasterRole owner: " + domaindnszonesMaster)
- self.message("ForestDnsZonesMasterRole owner: " + forestdnszonesMaster)
+ masters = [(schema_dn, "schema", "SchemaMasterRole"),
+ (infrastructure_dn, "infrastructure", "InfrastructureMasterRole"),
+ (rid_dn, "rid", "RidAllocationMasterRole"),
+ (domain_dn, "pdc", "PdcEmulationMasterRole"),
+ (naming_dn, "naming", "DomainNamingMasterRole"),
+ (domaindns_dn, "domaindns", "DomainDnsZonesMasterRole"),
+ (forestdns_dn, "forestdns", "ForestDnsZonesMasterRole"),
+ ]
+
+ for master in masters:
+ (dn, short_name, long_name) = master
+ try:
+ master = get_fsmo_roleowner(samdb, dn, short_name)
+ if master is not None:
+ self.message("%s owner: %s" % (long_name, str(master)))
+ else:
+ self.message("%s has no current owner" % (long_name))
+ except CommandError, e:
+ self.message("%s: * %s" % (long_name, e.message))
class cmd_fsmo_transfer(Command):
"""Transfer the role."""
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
-import os
+import os, ldb
from samba.tests.samba_tool.base import SambaToolCmdTest
class FsmoCmdTestCase(SambaToolCmdTest):
self.assertCmdSuccess(result)
self.assertEquals(err,"","Shouldn't be any error messages")
+
+ # Check that the output is sensible
+ samdb = self.getSamDB("-H", "ldap://%s" % os.environ["SERVER"],
+ "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]))
+
+ try:
+ res = samdb.search(base=ldb.Dn(samdb, "CN=Infrastructure,DC=DomainDnsZones") + samdb.get_default_basedn(),
+ scope=ldb.SCOPE_BASE, attrs=["fsmoRoleOwner"])
+
+ self.assertTrue("DomainDnsZonesMasterRole owner: " + res[0]["fsmoRoleOwner"][0] in out)
+ except ldb.LdbError, (enum, string):
+ if enum == ldb.ERR_NO_SUCH_OBJECT:
+ self.assertTrue("The 'domaindns' role is not present in this domain" in out)
+ else:
+ raise
+
+ res = samdb.search(base=samdb.get_default_basedn(),
+ scope=ldb.SCOPE_BASE, attrs=["fsmoRoleOwner"])
+
+ self.assertTrue("DomainNamingMasterRole owner: " + res[0]["fsmoRoleOwner"][0] in out)
def tearDown(self):
super(DrsFsmoTestCase, self).tearDown()
- def _net_fsmo_role_transfer(self, DC, role):
+ def _net_fsmo_role_transfer(self, DC, role, noop=False):
# find out where is samba-tool command
net_cmd = os.path.abspath("./bin/samba-tool")
# make command line credentials string
creds = self.get_credentials()
cmd_line_auth = "-U%s/%s%%%s" % (creds.get_domain(),
creds.get_username(), creds.get_password())
- # bin/samba-tool fsmo transfer --role=role -H ldap://DC:389
- cmd_line = "%s fsmo transfer --role=%s -H ldap://%s:389 %s" % (net_cmd, role, DC,
- cmd_line_auth)
- ret = os.system(cmd_line)
- self.assertEquals(ret, 0, "Transferring role %s to %s has failed!" % (role, DC))
+ (result, out, err) = self.runsubcmd("fsmo", "transfer",
+ "--role=%s" % role,
+ "-H", "ldap://%s:389" % DC,
+ cmd_line_auth)
+
+ self.assertCmdSuccess(result)
+ self.assertEquals(err,"","Shouldn't be any error messages")
+ if noop == False:
+ self.assertTrue("FSMO transfer of '%s' role successful" % role in out)
+ else:
+ self.assertTrue("This DC already has the '%s' FSMO role" % role in out)
+
def _wait_for_role_transfer(self, ldb_dc, role_dn, master):
"""Wait for role transfer for certain amount of time
self.assertTrue(res,
"Transferring %s role to %s has failed, master is: %s!"%(role, self.dsServiceName_dc1, master))
+ # dc1 keeps the role
+ print "Testing for no-op %s role transfer from %s to %s" % (role, self.dnsname_dc2, self.dnsname_dc1)
+ self._net_fsmo_role_transfer(DC=self.dnsname_dc1, role=role, noop=True)
+ # check if the role is transfered
+ (res, master) = self._wait_for_role_transfer(ldb_dc=self.ldb_dc1,
+ role_dn=role_dn,
+ master=self.dsServiceName_dc1)
+ self.assertTrue(res,
+ "Transferring %s role to %s has failed, master is: %s!"%(role, self.dsServiceName_dc1, master))
+
def test_SchemaMasterTransfer(self):
self._role_transfer(role="schema", role_dn=self.schema_dn)
def test_NamingMasterTransfer(self):
self._role_transfer(role="naming", role_dn=self.naming_dn)
-