print 'Unable to open configuration file: ' + configfile
sys.exit(1)
-
#
# open the output file
#
super(MatchRuleConditionTests, self).tearDown()
self.ldb.delete(self.ou, controls=['tree_delete:0'])
-
def test_g1_members(self):
res1 = self.ldb.search(self.ou,
scope=SCOPE_SUBTREE,
# Ensure the LDB is closed now, so we close the FD
del(self.l)
-
def setUp(self):
super(SearchTests, self).setUp()
self.testdir = tempdir()
self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
self.assertIn(estr, "ldb FULL SEARCH disabled")
-
def test_subtree_and_or(self):
"""Testing a search"""
self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
self.assertIn(estr, "ldb FULL SEARCH disabled")
-
def test_dn_filter_one(self):
"""Testing that a dn= filter succeeds
(or fails with disallowDNFilter
# https://bugzilla.samba.org/show_bug.cgi?id=13361
self.assertEquals(len(res), 4)
-
def tearDown(self):
super(BadIndexTests, self).tearDown()
found = True
self.assertTrue(found)
-
# Show that search results can't see into a transaction
-
def test_search_against_trans(self):
found11 = False
(got_pid, status) = os.waitpid(pid, 0)
self.assertEqual(got_pid, pid)
-
def test_search_iter_against_trans(self):
found = False
found11 = False
addrecbuf,
None)
-
print("querying the NS record")
res = dns_conn.DnssrvEnumRecords2(0x00070000,
0,
is_deleted = 'isDeleted' in obj and obj['isDeleted'][0].upper() == 'TRUE'
target_is_deleted = 'isDeleted' in res[0] and res[0]['isDeleted'][0].upper() == 'TRUE'
-
if is_deleted and not obj.dn in self.deleted_objects_containers and linkID:
# A fully deleted object should not have any linked
# attributes. (MS-ADTS 3.1.1.5.5.1.1 Tombstone
attrname, syntax_oid)
diff_count += 1
-
return error_count
-
def get_originating_time(self, val, attid):
'''Read metadata properties and return the originating time for
a given attributeId.
return (set_att, list_attid, wrong_attids)
-
def fix_metadata(self, obj, attr):
'''re-write replPropertyMetaData elements for a single attribute for a
object. This is used to fix missing replPropertyMetaData elements'''
self.report("Fixed attribute '%s' of '%s'\n" % (sd_attr, dn))
self.samdb.set_session_info(self.system_session_info)
-
def has_replmetadata_zero_invocationid(self, dn, repl_meta_data):
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
str(repl_meta_data))
return found
-
def err_replmetadata_zero_invocationid(self, dn, attr, repl_meta_data):
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
str(repl_meta_data))
"Failed to fix attribute %s" % attr):
self.report("Fixed attribute '%s' of '%s'\n" % (attr, dn))
-
def err_replmetadata_unknown_attid(self, dn, attr, repl_meta_data):
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
str(repl_meta_data))
self.report('ERROR: attributeID 0X%0X is not known in our schema, not fixing %s on %s\n' % (o.attid, attr, dn))
return
-
def err_replmetadata_incorrect_attid(self, dn, attr, repl_meta_data, wrong_attids):
repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
str(repl_meta_data))
"Failed to fix attribute %s" % attr):
self.report("Fixed attribute '%s' of '%s'\n" % (attr, dn))
-
def is_deleted_deleted_objects(self, obj):
faulty = False
if "description" not in obj:
# NCs
deleted_objects_dn = None
-
object_rdn_attr = None
object_rdn_val = None
name_val = None
self.samdb.transaction_commit()
-
elif not self.samdb.am_rodc():
self.report("No RID Set found for this server: %s, and we are not the RID Master (so can not self-allocate)" % dn)
-
# Check some details of our own RID Set
if dn == self.rid_set_dn:
res = self.samdb.search(base=self.rid_set_dn, scope=ldb.SCOPE_BASE,
else:
next_free_rid += 1
-
return error_count
################################################################
self.report("Changed dsServiceName to GUID form")
return error_count
-
###############################################
# re-index the database
-
def reindex_database(self):
'''re-index the whole database'''
m = ldb.Message()
return vmap, replacements
-
-
-
-
def compile_graph_key(key_items, nodes_above=[], elisions=None,
prefix='key_', width=2):
"""Generate a dot file snippet that acts as a legend for a graph.
vertices = [vmap[x] for x in vertices]
edges = [(vmap[a], vmap[b]) for a, b in edges]
-
vlen = max(6, max(len(v) for v in vertices))
# first, the key for the columns
(enum, estr) = e4.args
raise DCJoinException(estr)
-
ctx.base_dn = str(ctx.samdb.get_default_basedn())
ctx.root_dn = str(ctx.samdb.get_root_basedn())
ctx.schema_dn = str(ctx.samdb.get_schema_basedn())
(ldb.binary_encode("dns-%s" % ctx.myname),
ldb.binary_encode("dns/%s" % ctx.dnshostname)))
-
def cleanup_old_join(ctx, force=False):
"""Remove any DNs from a previous join."""
# find the krbtgt link
if ctx.dns_cname_dn:
ctx.del_noerror(ctx.dns_cname_dn)
-
-
def promote_possible(ctx):
"""confirm that the account is just a bare NT4 BDC or a member server, so can be safely promoted"""
if ctx.subdomain:
ctx.promote_from_dn = res[0].dn
-
def find_dc(ctx, domain):
"""find a writeable DC for the given domain"""
try:
ctx.site = ctx.cldap_ret.client_site
return ctx.cldap_ret.pdc_dns_name
-
def get_behavior_version(ctx):
res = ctx.samdb.search(base=ctx.base_dn, scope=ldb.SCOPE_BASE, attrs=["msDS-Behavior-Version"])
if "msDS-Behavior-Version" in res[0]:
r.attid = ctx.tmp_samdb.get_attid_from_lDAPDisplayName(attrname)
r.value_ctr = 1
-
def DsAddEntry(ctx, recs):
'''add a record via the DRSUAPI DsAddEntry call'''
if ctx.drsuapi is None:
dns_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[%s]" % (ctx.server, binding_options),
ctx.lp, ctx.creds)
-
name_found = True
sd_helper = samba.sd_utils.SDUtils(ctx.samdb)
% (security.SECINFO_OWNER
| security.SECINFO_GROUP)])
-
# Add record
ctx.logger.info("Adding DNS CNAME record %s.%s for %s"
% (msdcs_cname, msdcs_zone, cname_target))
ctx.logger.info("All other DNS records (like _ldap SRV records) " +
"will be created samba_dnsupdate on first startup")
-
def join_replicate_new_dns_records(ctx):
for nc in (ctx.domaindns_zone, ctx.forestdns_zone):
if nc in ctx.nc_list:
replica_flags=ctx.replica_flags,
full_sync=False)
-
-
def join_finalise(ctx):
"""Finalise the join, mark us synchronised and setup secrets db."""
}
ctx.local_samdb.add(rec)
-
def build_nc_lists(ctx):
# nc_list is the list of naming context (NC) for which we will
# replicate in and send a updateRef command to the partner DC
for v in vl:
out.append("%s: %s" % (l[0], v))
-
return "\n".join(out)
except:
raise CommandError("Failed to connect to DB at %s. If this is a really old sam.ldb (before alpha9), then try again with --force-modules" % H)
-
if H is None or not over_ldap:
samdb_schema = samdb
else:
metavar="URL", dest="H"),
]
-
takes_args = ["accountname", "onoff"]
def run(self, accountname, onoff, H=None, credopts=None, sambaopts=None,
print_dns_record(outf, dns_rec)
-
-
# Convert data into a dns record
def data_to_dns_record(record_type, data):
if record_type == dnsp.DNS_TYPE_A:
if samba.is_ntvfs_fileserver_built():
takes_options.extend(common_ntvfs_options)
-
takes_args = ["domain", "role?"]
def run(self, domain, role=None, sambaopts=None, credopts=None,
nmsg["options"] = ldb.MessageElement(str(dsa_options), ldb.FLAG_MOD_REPLACE, "options")
samdb.modify(nmsg)
-
self.errf.write("Asking partner server %s to synchronize from us\n"
% server)
for part in (samdb.get_schema_basedn(),
remote_samdb.modify(msg)
raise CommandError("Error while renaming %s to %s" % (str(dc_dn), str(newdn)), e)
-
server_dsa_dn = samdb.get_serverName()
domain = remote_samdb.get_root_basedn()
raise CommandError("Failed to find trust for domain '%s'" % domain)
raise self.RemoteRuntimeError(self, error, "failed to locate remote server")
-
if remote_policy_access is not None:
try:
remote_server = self.setup_remote_server(credopts, domain)
% (removed_objects, removed_links))
-
class cmd_domain_trust(SuperCommand):
"""Domain and forest trust management."""
return count
-
def _apply_update(self, samdb, update_file, base_dir):
"""Wrapper function for parsing an LDIF file and applying the updates"""
return "failed, result %u (%s)" % (ecode, estring)
-
def attr_default(msg, attrname, default):
'''get an attribute from a ldap msg with a default'''
if attrname in msg:
return default
-
def drs_parse_ntds_dn(ntds_dn):
'''parse a NTDS DN returning a site and server'''
a = ntds_dn.split(',')
self.message(colour.c_GREEN("[ALL GOOD]"))
-
def summary_output(self):
return self.summary_output_handler("summary")
source_dsa_guid = msg[0]['objectGUID'][0]
dsa_options = int(attr_default(msg, 'options', 0))
-
req_options = 0
if not (dsa_options & dsdb.DS_NTDSDSA_OPT_DISABLE_OUTBOUND_REPL):
req_options |= drsuapi.DRSUAPI_DRS_WRIT_REP
self.message("Replicate from %s to %s was successful." % (SOURCE_DC, DEST_DC))
-
class cmd_drs_bind(Command):
"""Show DRS capabilities of a server."""
self.message("Forest GUID: %s" % info.info.config_dn_guid)
-
class cmd_drs_options(Command):
"""Query or change 'options' for NTDS Settings object of a Domain Controller."""
)
-
class cmd_dsacl_set(Command):
"""Modify access list on a directory object."""
self.outf.write("FSMO seize of '%s' role successful\n" % role)
return True
-
def run(self, force=None, H=None, role=None,
credopts=None, sambaopts=None, versionopts=None):
)
-
class cmd_ntacl_set(Command):
"""Set ACLs on a file."""
elif use_s3fs:
use_ntvfs = False
-
s3conf = s3param.get_context()
s3conf.load(lp.configfile)
# ensure we are using the right samba_dsdb passdb backend, no matter what
raise NamingError("Failed to find account '%s'" % account)
return str(res[0]["dn"])
-
def run(self, *accounts, **kwargs):
sambaopts = kwargs.get("sambaopts")
credopts = kwargs.get("credopts")
else:
result = res[0]
-
msg = ldb.Message()
spns = result.get("servicePrincipalName")
tab = []
"""
synopsis = "%prog (<username>|--filter <filter>) [options]"
-
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"versionopts": options.VersionOptions,
except IndexError:
return None
-
# get the value for a virtualCrypt attribute.
# look for an exact match on algorithm and rounds in supplemental creds
# if not found calculate using Primary:CLEARTEXT
"BACKEND_STORE": backend_store_line
})
-
setup_add_ldif(samdb, setup_path("provision_init.ldif"), {
"BACKEND_TYPE": provision_backend.type,
"SERVER_ROLE": serverrole,
if domain_info["dns_domain"].upper() != dnsdomain.upper():
raise ProvisioningError('Realm as seen by pdb_samba_dsdb [%s] does not match Realm as seen by the provision script [%s]!' % (domain_info["dns_domain"].upper(), dnsdomain.upper()))
-
try:
if use_ntvfs:
os.chown(sysvol, -1, gid)
samdb.transaction_commit()
-
def offline_remove_dc_RemoveDsServer(samdb, ntds_dn):
samdb.start_transaction()
m["maxPwdAge"] = ldb.MessageElement(value, ldb.FLAG_MOD_REPLACE, "maxPwdAge")
self.modify(m)
-
def get_maxPwdAge(self):
res = self.search(self.domain_dn(), scope=ldb.SCOPE_BASE, attrs=["maxPwdAge"])
if len(res) == 0:
else:
return int(res[0]["maxPwdAge"][0])
-
-
def set_minPwdLength(self, value):
value = str(value).encode('utf8')
m = ldb.Message()
'''garbage_collect_tombstones(lp, samdb, [dn], current_time, tombstone_lifetime)
-> (num_objects_expunged, num_links_expunged)'''
-
if tombstone_lifetime is None:
return dsdb._dsdb_garbage_collect_tombstones(self, dn,
current_time)
c.set_kerberos_state(kerberos_state)
return c
-
-
# These functions didn't exist before Python2.7:
if sys.version_info < (2, 7):
import warnings
# is subject to a race between smbd and the s4 rpc_server code
# as to which will set the description as it is DCE/RPC over SMB
-
def test_modify(self):
dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
response = self.creds.get_ntlm_response(flags=credentials.CLI_CRED_NTLM_AUTH,
challenge=binascii.a2b_hex(hex_challenge))
-
self.assertEqual(response["nt_response"], binascii.a2b_hex(hex_ntlm_response))
self.assertEqual(response["nt_session_key"], binascii.a2b_hex(hex_session_key))
self.assertEqual(response["flags"], credentials.CLI_CRED_NTLM_AUTH)
-
def test_get_nt_hash_string(self):
self.creds.set_password_will_be_nt_hash(True)
hex_nthash = "c2ae1fe6e648846352453e816f2aeb93"
session_info=system_session(),
credentials=self.get_credentials())
-
self.custom_zone = "zone"
zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
zone_create_info.pszZoneName = self.custom_zone
request_filter)
self.assertEquals(0, zones.dwZoneCount)
-
def test_complexoperation2(self):
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
return self._test_spnego_level_bind(auth_level=dcerpc.DCERPC_AUTH_LEVEL_PRIVACY,
g_auth_level=dcerpc.DCERPC_AUTH_LEVEL_PRIVACY)
-
-
def _test_spnego_signing_auth_level_request(self, auth_level):
ndr32 = base.transfer_syntax_ndr()
def test_spnego_signing_integrity(self):
return self._test_spnego_signing_auth_level_request(dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY)
-
def test_assoc_group_fail1(self):
abstract = samba.dcerpc.mgmt.abstract_syntax()
transfer = base.transfer_syntax_ndr()
self.assertEqual(info.dns_name2, "2.example.com")
del info
-
def test_string_delete(self):
gc.collect()
info = drsblobs.repsFromTo2OtherInfo()
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-
def contact_real_server(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
s.connect((host, port))
except socket.timeout:
self.fail("DNS server is too slow (timeout %s)" % timeout)
-
def test_single_forwarder_waiting_forever(self):
s = self.start_toy_server(dns_servers[0], 53, 'forwarder1')
s.send('timeout 10000', 0)
except:
self.fail("Unable to load parameters")
-
def tearDown(self):
super(SmbDotConfTests, self).tearDown()
os.unlink(self.smbconf)
doc_triple = "%s\n Expected: %s" % (param, value_to_use)
failset.add("%s\n Got: %s" % (doc_triple, value_found))
-
if len(failset) > 0:
self.fail(self._format_message(failset,
"Parameters that were unexpectedly not set:"))
self.creds.guess(self.lp)
self.session = system_session()
-
def test_sam_ldb_open_no_full_scan(self):
try:
self.samdb = SamDB(session_info=self.session,
self.assertEqual(os.WEXITSTATUS(status), 0)
self.assertEqual(got_pid, pid)
-
def _test_full_db_lock1(self, backend_path):
(r1, w1) = os.pipe()
backenddb = ldb.Ldb(backend_path)
-
backenddb.transaction_start()
backenddb.add({"dn": "@DSDB_LOCK_TEST"})
backend_path = self.lp.private_path(backend_subpath)
self._test_full_db_lock1(backend_path)
-
def test_full_db_lock1_config(self):
basedn = self.samdb.get_config_basedn()
backend_filename = "%s.ldb" % basedn.get_casefold()
backend_path = self.lp.private_path(backend_subpath)
self._test_full_db_lock1(backend_path)
-
def _test_full_db_lock2(self, backend_path):
(r1, w1) = os.pipe()
(r2, w2) = os.pipe()
self.assertIn(attr_ldap_name, [str(x) for x in idx_res[0]["@IDXATTR"]])
-
def test_AddUnIndexedAttribute(self):
# create names for an attribute to add
(attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("schemaAttributes-Attr-")
self.assertNotIn(attr_ldap_name, [str(x) for x in idx_res[0]["@IDXATTR"]])
-
def test_AddTwoIndexedAttributes(self):
# create names for an attribute to add
(attr_name, attr_ldap_name, attr_dn) = self._make_obj_names("schemaAttributes-Attr-")
self.assertEquals(len(res[0]), 0)
self.assertFalse("@TEST_EXTRA" in res[0])
-
def test_modify_at_indexlist(self):
m = {"dn": "@INDEXLIST",
"@TEST_EXTRA": ["1"]
def test_update_ntlmssp_to_spnego(self):
self._test_update("GSS-SPNEGO", "ntlmssp")
-
def test_max_update_size(self):
"""Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC"""
self.assertEquals(gpos[i].ds_path, ds_paths[i],
'ds_path did not match expected %s' % gpos[i].ds_path)
-
def test_gpo_ads_does_not_segfault(self):
try:
ads = gpo.ADS_STRUCT(self.server, 42, self.creds)
super(JoinTestCase, self).tearDown()
-
def test_join_makes_records(self):
-
"create a query packet containing one query record via TCP"
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
questions = []
self.assertEquals(response.answers[0].rdata, self.join_ctx.dnshostname)
self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
-
def test_join_records_can_update(self):
dc_creds = Credentials()
dc_creds.guess(self.join_ctx.lp)
self.creds.set_username(os.environ["USERNAME"])
self.creds.set_password(os.environ["PASSWORD"])
-
def test_list_dsas(self):
my_kcc = kcc.KCC(unix_now, False, False, False, False)
my_kcc.load_samdb("ldap://%s" % os.environ["SERVER"],
self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
self.remove_files(dburl)
-
def test_samdb_to_ldif_file(self):
dburl = os.path.join(self.tempdir, "ldap")
dburl2 = os.path.join(self.tempdir, "ldap_roundtrip")
self.create_machine_account()
-
def tearDown(self):
super(PyKrb5CredentialsTests, self).tearDown()
delete_force(self.ldb, self.machine_dn)
self.netbios_domain = res[0]["nETBIOSName"][0]
self.dns_domain = self.ldb.domain_dns_name()
-
# Gets back the basedn
base_dn = self.ldb.domain_dn()
import binascii
-
class PassWordHashFl2003Tests(PassWordHashTests):
def setUp(self):
self.assertEquals(4, pos)
self.assertEquals("Primary:CLEARTEXT", ct_package.name)
-
# Check that the WDigest values are correct.
#
digests = ndr_unpack(drsblobs.package_PrimaryWDigestBlob,
def setUp(self):
super(PassWordHashFl2008Tests, self).setUp()
-
def test_default_supplementalCredentials(self):
self.add_user()
net_ctx.replicate_decrypt(drs, attr, 0)
sc_blob = attr.value_ctr.values[0].blob
-
sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
return sc
#
-
def test_setntacl_sysvol_dir_check_getposixacl(self):
acl = provision.SYSVOL_ACL
domsid = passdb.get_global_sam_sid()
# mask::rwx
# other::---
-
def test_setntacl_policies_dir_check_getposixacl(self):
acl = provision.POLICIES_ACL
domsid = passdb.get_global_sam_sid()
# mask::rwx
# other::---
-
-
def test_setntacl_policies_check_getposixacl(self):
acl = provision.POLICIES_ACL
self.create_machine_account()
self.create_user_account()
-
def tearDown(self):
super(PyCredentialsTests, self).tearDown()
delete_force(self.ldb, self.machine_dn)
(authenticator, subsequent) = self.get_authenticator(c)
self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
-
def test_SamLogonEx(self):
c = self.get_netlogon_connection()
else:
raise
-
# Test Credentials.encrypt_netr_crypt_password
# By performing a NetrServerPasswordSet2
# And the logging on using the new password.
-
def test_encrypt_netr_password(self):
# Change the password
self.do_Netr_ServerPasswordSet2()
self.lp,
self.machine_creds)
-
# Change the current machine account password with a
# netr_ServerPasswordSet2 call.
-
def do_Netr_ServerPasswordSet2(self):
c = self.get_netlogon_connection()
(authenticator, subsequent) = self.get_authenticator(c)
credentials=creds, lp=lp)
return samdb
-
def runcmd(self, name, *args):
"""run a single level command"""
cmd = cmd_sambatool.subcommands[name]
self.assertEquals("%s" % found.get("description"),
computer["description"])
-
def tearDown(self):
super(ComputerCmdTestCase, self).tearDown()
# clean up all the left over computers, just in case
self.assertCmdFail(result, "Succeeded to create existing computer")
self.assertIn("already exists", err)
-
# try to delete all the computers we just created
for computer in self.computers:
(result, out, err) = self.runsubcmd("computer", "delete", "%s" %
"is of type '%s' where '%s' was expected."
% (record1, record2, dnstype1, dnstype2))
-
def test_update_valid_type(self):
for dnstype in self.good_records:
for record in self.good_records[dnstype]:
if self._find_group(group["name"]):
self.runsubcmd("group", "delete", group["name"])
-
def test_newgroup(self):
"""This tests the "group add" and "group delete" commands"""
# try to add all the groups again, this should fail
self.assertEquals("%s" % found.get("samaccountname"),
"%s" % group["name"])
-
def test_list(self):
(result, out, err) = self.runsubcmd("group", "list",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
class NtACLCmdSysvolTestCase(SambaToolCmdTest):
"""Tests for samba-tool ntacl sysvol* subcommands"""
-
def test_ntvfs(self):
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-ntvfs")
acl = "O:DAG:DUD:P(A;OICI;0x001f01ff;;;DA)(A;OICI;0x001f01ff;;;EA)(A;OICIIO;0x001f01ff;;;CO)(A;OICI;0x001f01ff;;;DA)(A;OICI;0x001f01ff;;;SY)(A;OICI;0x001200a9;;;AU)(A;OICI;0x001200a9;;;ED)S:AI(OU;CIIDSA;WP;f30e3bbe-9ff0-11d1-b603-0000f80367c1;bf967aa5-0de6-11d0-a285-00aa003049e2;WD)(OU;CIIDSA;WP;f30e3bbf-9ff0-11d1-b603-0000f80367c1;bf967aa5-0de6-11d0-a285-00aa003049e2;WD)"
-
def test_ntvfs(self):
path = os.environ['SELFTEST_PREFIX']
tempf = os.path.join(path, "pytests" + str(int(100000 * random.random())))
self.assertCmdSuccess(result, out, err,
"Failed to delete ou '%s'" % ou["name"])
-
def test_newou(self):
"""This tests the "ou create" and "ou delete" commands"""
# try to create all the ous again, this should fail
found = self.assertMatch(out, str(obj.dn),
"object '%s' not found" % obj.dn)
-
def test_list(self):
(result, out, err) = self.runsubcmd("ou", "list",
"--full-dn")
found = self.assertMatch(out, str(obj.dn),
"object '%s' not found" % obj.dn)
-
def _randomOU(self, base={}):
"""create an ou with random attribute values, you can specify base
attributes"""
(result, out, err) = self.runsubcmd("drs", "replicate", "--local", "unused",
os.environ["DC_SERVER"], self.base_dn)
-
def test_single_by_account_name(self):
(result, out, err) = self.runsubcmd("rodc", "preload", "sambatool1",
"--server", os.environ["DC_SERVER"])
user["checkUserFn"](user)
-
def tearDown(self):
super(UserCmdTestCase, self).tearDown()
# clean up all the left over users, just in case
if self._find_user(user["name"]):
self.runsubcmd("user", "delete", user["name"])
-
def test_newuser(self):
# try to add all the users again, this should fail
for user in self.users:
self.assertEquals(err, "", "setpassword with forced change")
self.assertMatch(out, "Changed password OK", "setpassword with forced change")
-
-
-
def test_setexpiry(self):
for user in self.users:
twodays = time.time() + (2 * 24 * 60 * 60)
expires = nttime2unix(int("%s" % found.get("accountExpires")))
self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
-
def test_list(self):
(result, out, err) = self.runsubcmd("user", "list",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
USER_PASS)
self._testWDigest(attribute, expected)
-
# Hash14 MD5(LOWER(sAMAccountName),
# UPPER(DNSDomainName),
# password)
#
-
def test_Wdigest14(self):
attribute = "virtualWDigest14"
expected = calc_digest(USER_NAME.lower(),
except IOError as e:
self.assertEquals(e.errno, errno.ENOENT)
-
# Open a SamDB with the don't create new DB flag cleared.
# The underlying database file does not exist.
#
# Should successful open the SamDB creating a new database file.
#
-
def test_create_db_new_file(self):
SamDB(url="tdb://" + self.tempdir + "/test.db", flags=0)
existing = open(self.tempdir + "/test.db", mode="rb")
desc1 = security.descriptor.from_sddl(text, dom)
self.assertRaises(TypeError, desc1.as_sddl, text)
-
def test_as_sddl_no_domainsid(self):
dom = security.dom_sid("S-2-0-0")
text = "O:AOG:DAD:(A;;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)"
(username in pgids) and (pgids[username] is not None):
add_posix_attrs(samdb=result.samdb, sid=userdata[username].user_sid, name=username, nisdomain=domainname.lower(), xid_type="ID_TYPE_UID", home=homes[username], shell=shells[username], pgid=pgids[username], logger=logger)
-
logger.info("Adding users to groups")
# Start a new transaction (should speed this up a little, due to index churn)
result.samdb.transaction_start()
else:
raise Exception(parameter['name'] + " has an invalid param type " + parameter['type'])
-
file_out.write(output_string)
finally:
file_out.close()
parser.add_option("--target-service", dest="target_service",\
help="Target service for kerberos")
-
parser.add_option("--server-username", dest="server_username",\
help="User name server uses for local auth. [default: foo]")
parser.add_option("--server-password", dest="server_password",\
parser.add_option("--require-membership-of", dest="sid",\
help="Require that the user is a member of this group to authenticate.")
-
parser.add_option("-s", "--configfile", dest="config_file",\
help="Path to smb.conf file. [default:/etc/samba/smb.conf")
if buf.count("AF ", 0, 3) != 1:
sys.exit(4)
-
elif opts.client_helper == "ntlmssp-client-1" and opts.server_helper == "gss-spnego":
# We're in the parent
writeLine(client_out, "YR")
if buf.count("AF * ", 0, 5) != 1:
sys.exit(4)
-
elif opts.client_helper == "gss-spnego-client" and opts.server_helper == "gss-spnego":
# We're in the parent
writeLine(server_out, "YR")
return poss
-
# see [MS-ADTS] section 3.1.1.4.5.21
# and section 3.1.1.4.2 for this algorithm
# This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
self.fail()
-
def test_modify_u4(self):
"""11 Grant WP to PRINCIPAL_SELF and test modify"""
ldif = """
else:
self.fail()
-
def test_change_password7(self):
"""Try a password change operation without any CARs given"""
# users have change password by default - remove for negative testing
maybe_not, maybe_not))
random.seed(1)
-
-
for (j, c1, c2, v1, v2,
o1, o2, n1, n2) in random.sample(all_permutations, 100):
expression = ''.join(['(', j,
self._link_user_and_group(i, 0)
self.state.next_linked_user = e
-
test_00_01_adding_users_1000 = _test_add_many_users
test_00_10_complex_search_1k_users = _test_complex_search
self.assertEqual(objDeleted7["parentGUID"][0],
objDeleted6["objectGUID"][0])
-
objDeleted1 = self.search_guid(self.guid1)
objDeleted2 = self.search_guid(self.guid2)
objDeleted3 = self.search_guid(self.guid3)
self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn))
-
if not "://" in host:
if os.path.isfile(host):
host = "tdb://%s" % host
attrs=["objectGUID"],
controls=["dirsync:1:0:0"])
-
def test_dirsync_send_delta(self):
"""Check that dirsync return correct delta when sending the last cookie"""
res = self.ldb_admin.search(self.base_dn,
self.ldb_admin.deletegroup("testgroup")
self.assertEqual(len(res[0].get("member")), 0)
-
-
def test_dirsync_deleted_items(self):
"""Check that dirsync returnd deleted objects too"""
# Let's create an OU
self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
add_members_operation=True)
-
res = self.ldb_admin.search(self.base_dn,
expression="(name=Administrators)",
controls=[control1])
self._checkSchemaInfo(schi_before, schi_after)
pass
-
def _make_class_ldif(self, class_name, class_dn, sub_oid):
ldif = """
dn: """ + class_dn + """
self.ldb.add({'objectclass': 'organizationalUnit',
'dn': ou})
-
managers = []
for x in range(3):
m = "cn=manager%d,%s" % (x, ou)
self.ldb.delete(ou, ['tree_delete:1'])
-
def test_multivalued_attributes(self):
"""Test multi-valued attributes"""
ou = 'OU=mvattr,%s' % (self.base_dn)
self.ldb.delete(ou, ['tree_delete:1'])
-
def test_attribute_ranges(self):
"""Test attribute ranges"""
# Too short (min. 1)
delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
-
# this test needs to be disabled until we really understand
# what the rDN length constraints are
-
def DISABLED_test_largeRDN(self):
"""Testing large rDN (limit 64 characters)"""
rdn = "CN=a012345678901234567890123456789012345678901234567890123456789012"
self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)")
self.assertEquals(len(res[0]["servicePrincipalName;range=0-19"]), 20)
-
res = ldb.search(self.base_dn, expression="(cn=ldaptest2computer))", scope=SCOPE_SUBTREE, attrs=["servicePrincipalName;range=0-30"])
self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)")
self.assertEquals(len(res[0]["servicePrincipalName;range=0-*"]), 30)
self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)")
self.assertEquals(len(res[0]["servicePrincipalName;range=30-*"]), 0)
-
res = ldb.search(self.base_dn, expression="(cn=ldaptest2computer))", scope=SCOPE_SUBTREE, attrs=["servicePrincipalName;range=10-40"])
self.assertEquals(len(res), 1, "Could not find (cn=ldaptest2computer)")
self.assertEquals(len(res[0]["servicePrincipalName;range=10-*"]), 20)
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
from __future__ import print_function
import optparse
import sys
# Delete the object
delete_force(self.ldb, "ou=%s,%s" % (object_name, self.base_dn))
-
def test_duplicate_attributeID(self):
"""Testing creating a duplicate attribute"""
rand = str(random.randint(1, 100000))
(enum, estr) = e2.args
self.assertEquals(enum, ERR_UNWILLING_TO_PERFORM)
-
def test_duplicate_attributeID_governsID(self):
"""Testing creating a duplicate attribute and class"""
rand = str(random.randint(1, 100000))
(enum, estr) = e3.args
self.assertEquals(enum, ERR_UNWILLING_TO_PERFORM)
-
def test_duplicate_cn(self):
"""Testing creating a duplicate attribute"""
rand = str(random.randint(1, 100000))
(enum, estr) = e5.args
self.assertEquals(enum, ERR_UNWILLING_TO_PERFORM)
-
def test_duplicate_explicit_ldapdisplayname(self):
"""Testing creating a duplicate attribute ldapdisplayname"""
rand = str(random.randint(1, 100000))
(enum, estr) = e6.args
self.assertEquals(enum, ERR_UNWILLING_TO_PERFORM)
-
def test_duplicate_explicit_ldapdisplayname_with_class(self):
"""Testing creating a duplicate attribute ldapdisplayname between
and attribute and a class"""
(enum, estr) = e7.args
self.assertEquals(enum, ERR_UNWILLING_TO_PERFORM)
-
def test_duplicate_via_rename_ldapdisplayname(self):
"""Testing creating a duplicate attribute ldapdisplayname"""
rand = str(random.randint(1, 100000))
(enum, estr) = e8.args
self.assertEquals(enum, ERR_UNWILLING_TO_PERFORM)
-
def test_duplicate_via_rename_attributeID(self):
"""Testing creating a duplicate attributeID"""
rand = str(random.randint(1, 100000))
"""
self.ldb.modify_ldif(ldif)
-
def test_change_attributeID(self):
"""Testing change the attributeID"""
rand = str(random.randint(1, 100000))
(enum, estr) = e11.args
self.assertEquals(enum, ERR_CONSTRAINT_VIOLATION)
-
def test_change_attributeID_same(self):
"""Testing change the attributeID to the same value"""
rand = str(random.randint(1, 100000))
(enum, estr) = e12.args
self.assertEquals(enum, ERR_CONSTRAINT_VIOLATION)
-
def test_generated_linkID(self):
"""
Test that we automatically generate a linkID if the
(enum, estr) = e22.args
self.assertEquals(enum, ERR_UNWILLING_TO_PERFORM)
-
def test_change_governsID(self):
"""Testing change the governsID"""
rand = str(random.randint(1, 100000))
(enum, estr) = e23.args
self.assertEquals(enum, ERR_CONSTRAINT_VIOLATION)
-
def test_change_governsID_same(self):
"""Testing change the governsID"""
rand = str(random.randint(1, 100000))
(enum, estr) = e24.args
self.assertEquals(enum, ERR_CONSTRAINT_VIOLATION)
-
def test_subClassOf(self):
""" Testing usage of custom child classSchema
"""
(num, _) = e28.args
self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
-
def _make_class_ldif(self, class_dn, class_name, sub_oid):
ldif = """
dn: """ + class_dn + """
self.assertEquals(len(res), 1)
self.assertFalse("msDS-IntId" in res[0])
-
def test_verify_msDS_IntId(self):
"""Verify msDS-IntId exists only on attributes without FLAG_SCHEMA_BASE_OBJECT flag set"""
count = 0
self.replace_linked_attribute, g2,
[u1, u2, u3, u2])
-
def test_la_links_replace2(self):
users = self.add_objects(12, 'user', 'u_replace2')
g1, = self.add_objects(1, 'group', 'g_replace2')
msg="attr[%s]=%r on dn[%s]" %
(name, res[0][name], res[0].dn))
-
print("%s = '%s'" % (name, res[0][name][0]))
if mode == "present":
self.assertEquals(uinfo21.last_logon, lastLogon)
self.assertEquals(uinfo21.logon_count, logonCount)
-
# check LDAP again and make sure the samr.QueryUserInfo
# doesn't have any impact.
res2 = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
print(firstLogon)
print(lastLogonTimestamp)
-
self.assertGreater(lastLogon, badPasswordTime)
self.assertGreaterEqual(lastLogon, lastLogonTimestamp)
msg=("second logon, firstlogon was %s" %
firstLogon))
-
lastLogon = int(res[0]["lastLogon"][0])
time.sleep(1)
"%s (which does not exist)" % dn)
-
def main():
global HOST, CREDS, LP
parser = optparse.OptionParser("rodc.py [options] <host>")
set_auto_replication(RWDC, False)
-
def get_server_ref_from_samdb(samdb):
server_name = samdb.get_serverName()
res = samdb.search(server_name,
"Expected: %s Got: %s" %
(errno, code))
-
def zero_min_password_age(self):
min_pwd_age = int(self.rwdc_db.get_minPwdAge())
if min_pwd_age != 0:
# This SHOULD succeed.
self.try_ldap_logon(RODC, creds2)
-
def test_change_password_reveal_on_demand_ntlm(self):
CREDS.set_kerberos_state(DONT_USE_KERBEROS)
self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
-
def test_fSMORoleOwner_attribute(self):
"""Test fSMORoleOwner attribute"""
print("Test fSMORoleOwner attribute")
["CN=Domain Controllers", "CN=Users,"],
]
-
-
for pr_object in protected_list:
try:
self.ldb.delete(pr_object[0] + "," + pr_object[1] + self.base_dn)
self.fail()
-
TestProgram(module=__name__, opts=subunitopts)
print("difference : %s" % sidset1.difference(sidset2))
self.fail(msg="calculated groups don't match against user PAC tokenGroups")
-
def test_tokenGroups_manual(self):
# Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
# and compare the result
if len(tokenGroupsSet.difference(wSet)):
self.fail(msg="additional tokenGroups: %s" % tokenGroupsSet.difference(wSet))
-
def filtered_closure(self, wSet, filter_grouptype):
res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
expression="(|(objectclass=user)(objectclass=group))",
closure(uSet, wSet, aSet)
-
def test_tokenGroupsGlobalAndUniversal_manual(self):
# Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
# and compare the result
res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
-
# urgent replication should be enabled when deleting
self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
# Now reconnect without domain admin rights
self.samdb = SamDB(url=ldaphost, credentials=self.unpriv_creds, lp=lp)
-
def tearDown(self):
super(UserAccountControlTests, self).tearDown()
for computername in self.computernames:
(enum, estr) = e11.args
self.assertEqual(ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS, enum)
-
def test_admin_mod_uac(self):
computername = self.computernames[0]
self.add_computer_ldap(computername, samdb=self.admin_samdb)
self.assertEqual(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE)
-
def test_uac_bits_set(self):
user_sid = self.sd_utils.get_object_sid(self.unpriv_user_dn)
mod = "(OA;;CC;bf967a86-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
else:
self.fail("Unable to set userAccountControl bit 0x%08X on %s: %s" % (bit, m.dn, estr))
-
def uac_bits_unrelated_modify_helper(self, account_type):
user_sid = self.sd_utils.get_object_sid(self.unpriv_user_dn)
mod = "(OA;;CC;bf967a86-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
(enum, estr) = e13.args
self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM)
-
def test_primarygroupID_priv_DC_modify(self):
computername = self.computernames[0]
scope=SCOPE_SUBTREE,
attrs=[""])
-
m = ldb.Message()
m.dn = ldb.Dn(self.admin_samdb, "<SID=%s-%d>" % (str(self.domain_sid),
security.DOMAIN_RID_USERS))
scope=SCOPE_SUBTREE,
attrs=[""])
-
m = ldb.Message()
m.dn = ldb.Dn(self.admin_samdb, "<SID=%s-%d>" % (str(self.domain_sid),
security.DOMAIN_RID_USERS))
(enum, estr) = e15.args
self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM)
-
def test_primarygroupID_priv_user_modify(self):
computername = self.computernames[0]
scope=SCOPE_SUBTREE,
attrs=[""])
-
m = ldb.Message()
m.dn = ldb.Dn(self.admin_samdb, "<SID=%s-%d>" % (str(self.domain_sid),
security.DOMAIN_RID_ADMINS))
self.assertCorrectResults(results, expected_order,
offset, before, after)
-
-
def test_server_vlv_with_cookie_show_deleted(self):
"""What do we see with the show_deleted control?"""
attrs = ['objectGUID',
self.assertCorrectResults(results, expected_order,
offset, before, after)
-
def test_server_vlv_gte_with_cookie(self):
attrs = [x for x in self.users[0].keys() if x not in
('dn', 'objectclass')]
"testdenied", "$DC_PASSWORD", "ncacn_np:$SERVER", configuration])
-
plantestsuite("samba4.blackbox.provision-backend", "none", ["PYTHON=%s" % python, os.path.join(samba4srcdir, "setup/tests/blackbox_provision-backend.sh"), '$PREFIX/provision'])
# Test renaming the DC
dn_ordered=dn_ordered)
return (ctr6.new_highwatermark, ctr6.uptodateness_vector)
-
def _get_ctr6_dn_list(self, ctr6):
"""
Returns the DNs contained in a DsGetNCChanges response.
return dn_list
-
def _check_ctr6(self, ctr6, expected_dns=[], expected_links=[],
dn_ordered=True, links_ordered=True,
more_data=False, nc_object_count=0,
return partial_attribute_set
-
class AbstractLink:
def __init__(self, attid, flags, identifier, targetGUID,
targetDN=""):
else:
self.assertTrue("This DC already has the '%s' FSMO role" % role in out)
-
def _wait_for_role_transfer(self, ldb_dc, role_dn, master):
"""Wait for role transfer for certain amount of time
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
pfm = ctr.mapping_ctr
-
req8 = self._exop_req8(dest_dsa=None,
invocation_id=dc_guid_1,
nc_dn_str=self.user,
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
pfm = ctr.mapping_ctr
-
req8 = self._exop_req8(dest_dsa=None,
invocation_id=dc_guid_1,
nc_dn_str=self.user,
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
pfm = ctr.mapping_ctr
-
req8 = self._exop_req8(dest_dsa=None,
invocation_id=dc_guid_1,
nc_dn_str=self.user,
return links
-
def assert_forward_links(self, obj, expected, attr='member'):
results = self.attr_search(obj, expected, attr)
self.assertEqual(len(results), len(expected))
new_dn = res[0].dn
self.assert_forward_links(new_dn, {})
-
def test_la_links_delete_link(self):
u1, u2 = self.add_objects(2, 'user', 'u_del_link')
g1, g2 = self.add_objects(2, 'group', 'g_del_link')
return user_cur
-
def test_ReplicateMoveObject1(self):
"""Verifies how a moved container with a user inside is replicated between two DCs.
This test should verify that:
is_deleted=False,
expected_metadata=modified_metadata)
-
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
expected_metadata=deleted_modified_metadata_dc1)
self.assertFalse("description" in user_cur)
-
def test_ReplicateMoveObject2(self):
"""Verifies how a moved container with a user inside is not
replicated between two DCs as no replication is triggered
is_deleted=True,
expected_metadata=deleted_metadata_dc1)
-
def test_ReplicateMoveObject3(self):
"""Verifies how a moved container with a user inside is replicated between two DCs.
This test should verify that:
obj_orig=user_orig, is_deleted=False,
expected_metadata=initial_metadata)
-
new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
new_dn.add_base(self.ou2_dn)
self.ldb_dc1.rename(user_dn, new_dn)
is_deleted=True,
expected_metadata=deleted_metadata_dc2)
-
def test_ReplicateMoveObject3b(self):
"""Verifies how a moved container with a user inside is replicated between two DCs.
This test should verify that:
obj_orig=user_orig, is_deleted=False,
expected_metadata=initial_metadata)
-
new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
new_dn.add_base(self.ou2_dn)
self.ldb_dc1.rename(user_dn, new_dn)
is_deleted=True,
expected_metadata=deleted_metadata_dc2)
-
def test_ReplicateMoveObject4(self):
"""Verifies how a moved container with a user inside is replicated between two DCs.
This test should verify that:
(DRSUAPI_ATTID_objectCategory, self.dc1_guid, 2),
(DRSUAPI_ATTID_isRecycled, self.dc1_guid, 1)]
-
# check user info on DC1 - should be deleted user
user_cur = self._check_obj(sam_ldb=self.ldb_dc1, drs=self.drs_dc1,
obj_orig=user_moved_orig,
user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
self.assertFalse("description" in user_cur)
-
def test_ReplicateMoveObject6(self):
"""Verifies how a moved container is replicated between two DCs.
This test should verify that:
ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
self.assertFalse("description" in ou_cur)
-
def test_ReplicateMoveObject7(self):
"""Verifies how a moved container is replicated between two DCs.
This test should verify that:
ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
self.assertFalse("description" in ou_cur)
-
def test_ReplicateMoveObject8(self):
"""Verifies how a moved container is replicated between two DCs.
This test should verify that:
ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
self.assertFalse("description" in ou_cur)
-
def test_ReplicateMoveObject9(self):
"""Verifies how a moved container is replicated between two DCs.
This test should verify that:
ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
self.assertFalse("description" in ou_cur)
-
def test_ReplicateMoveObject10(self):
"""Verifies how a moved container is replicated between two DCs.
This test should verify that:
ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True)
self.assertFalse("description" in ou_cur)
-
def test_ReplicateMoveObject11(self):
"""Verifies how a moved container is replicated between two DCs.
This test should verify that:
self.assertFalse("description" in ou_cur)
-
class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
self.ou6["objectclass"] = "organizationalUnit"
self.ou6["ou"] = self.ou6_dn.get_component_value(0)
-
def tearDown(self):
self.ldb_dc1.delete(self.top_ou, ["tree_delete:1"])
self._enable_all_repl(self.dnsname_dc1)
return user_cur
-
def test_ReplicateMoveInTree1(self):
"""Verifies how an object is replicated between two DCs.
This test should verify that:
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
-
def test_ReplicateMoveInTree2(self):
"""Verifies how an object is replicated between two DCs.
This test should verify that:
user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True)
self.assertFalse("description" in user_cur)
-
def test_ReplicateMoveInTree3(self):
"""Verifies how an object is replicated between two DCs.
This test should verify that:
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
-
def test_ReplicateMoveInTree3b(self):
"""Verifies how an object is replicated between two DCs.
This test should verify that:
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
-
def test_ReplicateMoveInTree4(self):
"""Verifies how an object is replicated between two DCs.
This test should verify that:
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
-
def test_ReplicateAddInOU(self):
"""Verifies how an object is replicated between two DCs.
This test should verify that:
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
-
def test_ReplicateAddInMovedOU(self):
"""Verifies how an object is replicated between two DCs.
This test should verify that:
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
-
def test_ReplicateAddInConflictOU_time(self):
"""Verifies how an object is replicated between two DCs, when created in an ambigious location
This test should verify that:
self.rodc_pass = "password12#"
self.computer_dn = "CN=%s,OU=Domain Controllers,%s" % (self.rodc_name, self.base_dn)
-
self.rodc_ctx = DCJoinContext(server=self.ldb_dc1.host_dns_name(),
creds=self.get_credentials(),
lp=self.get_loadparm(), site=self.site,
else:
self.assertEquals(list_1[i].originating_change_time, list_2[i].originating_change_time)
-
def _create_rodc(self, ctx):
ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
self._check_deleted(self.ldb_dc2, self.ou1)
self._check_deleted(self.ldb_dc2, self.ou2)
-
def test_ReplConflictsRenameRemoteWin_with_child(self):
"""Tests that objects created in conflict become conflict DNs"""
self._disable_inbound_repl(self.dnsname_dc1)
self._check_deleted(self.ldb_dc2, ou1_child)
self._check_deleted(self.ldb_dc2, ou2_child)
-
def test_ReplConflictsRenameLocalWin(self):
"""Tests that objects created in conflict become conflict DNs"""
self._disable_inbound_repl(self.dnsname_dc1)
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
-
# Check all deleted on DC1
self._check_deleted(self.ldb_dc1, self.ou1)
self._check_deleted(self.ldb_dc1, self.ou2)
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
-
# Check all deleted on DC1
self._check_deleted(self.ldb_dc1, self.ou1)
self._check_deleted(self.ldb_dc1, self.ou2)
self.assertTrue(dodn in str(ou_cur["dn"]),
"OU %s is deleted but it is not located under %s!" % (name_cur, dodn))
-
def test_ReplConflictsRODC(self):
"""Tests that objects created in conflict become conflict DNs"""
# Replicate all objects to RODC beforehand
self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
shutil.rmtree(targetdir, ignore_errors=True)
-
def test_rid_set_dbcheck_after_seize(self):
"""Perform a join against the RID manager and assert we have a RID Set.
We seize the RID master role, then using dbcheck, we assert that we can
t.wait_reboot()
-
def test_dcpromo_rodc(t, vm):
'''test the RODC dcpromo worked'''
t.info("Checking the w2k8 RODC join is OK")
else:
return subprocess.call(cmd, shell=shell, cwd=dir)
-
def run_child(self, cmd, dir="."):
'''create a child and return the Popen handle to it'''
cwd = os.getcwd()
''' % (self.getvar('LCREALM'), self.getvar('INTERFACE_IP')),
mode='a')
-
# add forwarding for the windows domains
domains = self.get_domains()
''' % (d, domains[d]),
mode='a')
-
self.write_file("etc/rndc.conf", '''
# Start of rndc.conf
key "rndc-key" {
};
''')
-
def stop_bind(self):
'''Stop our private BIND from listening and operating'''
self.rndc_cmd("stop", checkfail=False)
self.run_cmd("rm -rf var/named")
-
def start_bind(self):
'''restart the test environment version of bind'''
self.info("Restarting bind9")
self.info("restoring /etc/resolv.conf")
self.run_cmd("mv -f %s /etc/resolv.conf" % self.resolv_conf_backup)
-
def vm_poweroff(self, vmname, checkfail=True):
'''power off a VM'''
self.setvar('VMNAME', vmname)
child.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=5)
return True
-
def resolve_ip(self, hostname, retries=60, delay=5):
'''resolve an IP given a hostname, assuming NBT'''
while retries > 0:
self.info("retrying (retries=%u delay=%u)" % (retries, delay))
raise RuntimeError("Failed to resolve IP of %s" % hostname)
-
def open_telnet(self, hostname, username, password, retries=60, delay=5, set_time=False, set_ip=False,
disable_firewall=True, run_tlntadmn=True, set_noexpire=False):
'''open a telnet connection to a windows server, return the pexpect child'''
ret.append(self.vars[v])
return ret
-
def run_dcpromo_as_first_dc(self, vm, func_level=None):
self.setwinvars(vm)
self.info("Configuring a windows VM ${WIN_VM} at the first DC in the domain using dcpromo")
self.retry_cmd("host -t SRV _ldap._tcp.${WIN_REALM} ${WIN_IP}", ['has SRV record'], retries=60, delay=5)
-
def start_winvm(self, vm):
'''start a Windows VM'''
self.setwinvars(vm)
child.expect("Registration of the DNS resource records for all adapters of this computer has been initiated. Any errors will be reported in the Event Viewer")
child.expect("C:")
-
def test_remote_smbclient(self, vm, username="${WIN_USER}", password="${WIN_PASS}", args=""):
'''test smbclient against remote server'''
self.setwinvars(vm)
child.sendline("net use t: \\\\${HOSTNAME}.%s\\test" % realm)
child.expect("The command completed successfully")
-
def setup(self, testname, subdir):
'''setup for main tests, parsing command line'''
self.parser.add_option("--conf", type='string', default='', help='config file')