dn = ldb.Dn(self.samdb, "<GUID=%s>" % guid_str)
res = self.samdb.search(base=dn, scope=ldb.SCOPE_BASE, attrs=[attr],
controls=["search_options:1:2",
- "show_recycled:1"])
+ "show_recycled:1"])
msg = res[0]
nmsg = ldb.Message()
nmsg.dn = dn
set_attrs_seen.add(str(attrname).lower())
if syntax_oid in [dsdb.DSDB_SYNTAX_BINARY_DN, dsdb.DSDB_SYNTAX_OR_NAME,
- dsdb.DSDB_SYNTAX_STRING_DN, ldb.SYNTAX_DN]:
+ dsdb.DSDB_SYNTAX_STRING_DN, ldb.SYNTAX_DN]:
# it's some form of DN, do specialised checking on those
error_count += self.check_dn(obj, attrname, syntax_oid)
else:
def mapper(self):
return {'kdc:user_ticket_lifetime': (self.set_kdc_tdb, self.explicit),
- 'kdc:service_ticket_lifetime': (self.set_kdc_tdb,
- self.mins_to_hours),
- 'kdc:renewal_lifetime': (self.set_kdc_tdb,
- self.days_to_hours),
+ 'kdc:service_ticket_lifetime': (self.set_kdc_tdb,
+ self.mins_to_hours),
+ 'kdc:renewal_lifetime': (self.set_kdc_tdb,
+ self.days_to_hours),
}
def __str__(self):
def mapper(self):
'''ldap value : samba setter'''
return {"minPwdAge": (self.ch_minPwdAge, self.days2rel_nttime),
- "maxPwdAge": (self.ch_maxPwdAge, self.days2rel_nttime),
- # Could be none, but I like the method assignment in
- # update_samba
- "minPwdLength": (self.ch_minPwdLength, self.explicit),
- "pwdProperties": (self.ch_pwdProperties, self.explicit),
+ "maxPwdAge": (self.ch_maxPwdAge, self.days2rel_nttime),
+ # Could be none, but I like the method assignment in
+ # update_samba
+ "minPwdLength": (self.ch_minPwdLength, self.explicit),
+ "pwdProperties": (self.ch_pwdProperties, self.explicit),
}
ctx.topology_dn = None
ctx.SPNs = ["HOST/%s" % ctx.myname,
- "HOST/%s" % ctx.dnshostname,
- "GC/%s/%s" % (ctx.dnshostname, ctx.dnsforest)]
+ "HOST/%s" % ctx.dnshostname,
+ "GC/%s/%s" % (ctx.dnshostname, ctx.dnsforest)]
res_rid_manager = ctx.samdb.search(scope=ldb.SCOPE_BASE,
attrs=["rIDManagerReference"],
"dn": ctx.krbtgt_dn,
"objectclass": "user",
"useraccountcontrol": str(samba.dsdb.UF_NORMAL_ACCOUNT |
- samba.dsdb.UF_ACCOUNTDISABLE),
+ samba.dsdb.UF_ACCOUNTDISABLE),
"showinadvancedviewonly": "TRUE",
"description": "krbtgt for %s" % ctx.samname}
ctx.samdb.add(rec, ["rodc_join:1:1"])
"objectclass": "server",
# windows uses 50000000 decimal for systemFlags. A windows hex/decimal mixup bug?
"systemFlags": str(samba.dsdb.SYSTEM_FLAG_CONFIG_ALLOW_RENAME |
- samba.dsdb.SYSTEM_FLAG_CONFIG_ALLOW_LIMITED_MOVE |
- samba.dsdb.SYSTEM_FLAG_DISALLOW_MOVE_ON_DELETE),
+ samba.dsdb.SYSTEM_FLAG_CONFIG_ALLOW_LIMITED_MOVE |
+ samba.dsdb.SYSTEM_FLAG_DISALLOW_MOVE_ON_DELETE),
# windows seems to add the dnsHostName later
"dnsHostName": ctx.dnshostname}
samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
ctx.SPNs.extend(["RestrictedKrbHost/%s" % ctx.myname,
- "RestrictedKrbHost/%s" % ctx.dnshostname])
+ "RestrictedKrbHost/%s" % ctx.dnshostname])
ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
ctx.secure_channel_type = misc.SEC_CHAN_RODC
ctx.RODC = True
ctx.replica_flags |= (drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING |
- drsuapi.DRSUAPI_DRS_GET_ALL_GROUP_MEMBERSHIP)
+ drsuapi.DRSUAPI_DRS_GET_ALL_GROUP_MEMBERSHIP)
ctx.domain_replica_flags = ctx.replica_flags
if domain_critical_only:
ctx.domain_replica_flags |= drsuapi.DRSUAPI_DRS_CRITICAL_ONLY
def boot_method_string(boot_method):
enum_defs = ['DNS_BOOT_METHOD_UNINITIALIZED', 'DNS_BOOT_METHOD_FILE',
- 'DNS_BOOT_METHOD_REGISTRY', 'DNS_BOOT_METHOD_DIRECTORY']
+ 'DNS_BOOT_METHOD_REGISTRY', 'DNS_BOOT_METHOD_DIRECTORY']
return enum_string(dnsserver, enum_defs, boot_method)
def name_check_flag_string(check_flag):
enum_defs = ['DNS_ALLOW_RFC_NAMES_ONLY', 'DNS_ALLOW_NONRFC_NAMES',
- 'DNS_ALLOW_MULTIBYTE_NAMES', 'DNS_ALLOW_ALL_NAMES']
+ 'DNS_ALLOW_MULTIBYTE_NAMES', 'DNS_ALLOW_ALL_NAMES']
return enum_string(dnsserver, enum_defs, check_flag)
def zone_type_string(zone_type):
enum_defs = ['DNS_ZONE_TYPE_CACHE', 'DNS_ZONE_TYPE_PRIMARY',
- 'DNS_ZONE_TYPE_SECONDARY', 'DNS_ZONE_TYPE_STUB',
- 'DNS_ZONE_TYPE_FORWARDER', 'DNS_ZONE_TYPE_SECONDARY_CACHE']
+ 'DNS_ZONE_TYPE_SECONDARY', 'DNS_ZONE_TYPE_STUB',
+ 'DNS_ZONE_TYPE_FORWARDER', 'DNS_ZONE_TYPE_SECONDARY_CACHE']
return enum_string(dnsp, enum_defs, zone_type)
def zone_update_string(zone_update):
enum_defs = ['DNS_ZONE_UPDATE_OFF', 'DNS_ZONE_UPDATE_UNSECURE',
- 'DNS_ZONE_UPDATE_SECURE']
+ 'DNS_ZONE_UPDATE_SECURE']
return enum_string(dnsp, enum_defs, zone_update)
def zone_secondary_security_string(security):
enum_defs = ['DNS_ZONE_SECSECURE_NO_SECURITY', 'DNS_ZONE_SECSECURE_NS_ONLY',
- 'DNS_ZONE_SECSECURE_LIST_ONLY', 'DNS_ZONE_SECSECURE_NO_XFER']
+ 'DNS_ZONE_SECSECURE_LIST_ONLY', 'DNS_ZONE_SECSECURE_NO_XFER']
return enum_string(dnsserver, enum_defs, security)
def zone_notify_level_string(notify_level):
enum_defs = ['DNS_ZONE_NOTIFY_OFF', 'DNS_ZONE_NOTIFY_ALL_SECONDARIES',
- 'DNS_ZONE_NOTIFY_LIST_ONLY']
+ 'DNS_ZONE_NOTIFY_LIST_ONLY']
return enum_string(dnsserver, enum_defs, notify_level)
def dp_flags_string(dp_flags):
bitmap_defs = ['DNS_DP_AUTOCREATED', 'DNS_DP_LEGACY', 'DNS_DP_DOMAIN_DEFAULT',
- 'DNS_DP_FOREST_DEFAULT', 'DNS_DP_ENLISTED', 'DNS_DP_DELETED']
+ 'DNS_DP_FOREST_DEFAULT', 'DNS_DP_ENLISTED', 'DNS_DP_DELETED']
return bitmap_string(dnsserver, bitmap_defs, dp_flags)
def zone_flags_string(flags):
bitmap_defs = ['DNS_RPC_ZONE_PAUSED', 'DNS_RPC_ZONE_SHUTDOWN',
- 'DNS_RPC_ZONE_REVERSE', 'DNS_RPC_ZONE_AUTOCREATED',
- 'DNS_RPC_ZONE_DSINTEGRATED', 'DNS_RPC_ZONE_AGING',
- 'DNS_RPC_ZONE_UPDATE_UNSECURE', 'DNS_RPC_ZONE_UPDATE_SECURE',
- 'DNS_RPC_ZONE_READONLY']
+ 'DNS_RPC_ZONE_REVERSE', 'DNS_RPC_ZONE_AUTOCREATED',
+ 'DNS_RPC_ZONE_DSINTEGRATED', 'DNS_RPC_ZONE_AGING',
+ 'DNS_RPC_ZONE_UPDATE_UNSECURE', 'DNS_RPC_ZONE_UPDATE_SECURE',
+ 'DNS_RPC_ZONE_READONLY']
return bitmap_string(dnsserver, bitmap_defs, flags)
raise CommandError("Failed to find objectClass for user %s" % username)
session_info_flags = (AUTH_SESSION_INFO_DEFAULT_GROUPS |
- AUTH_SESSION_INFO_AUTHENTICATED)
+ AUTH_SESSION_INFO_AUTHENTICATED)
# When connecting to a remote server, don't look up the local privilege DB
if self.url is not None and self.url.startswith('ldap'):
# Get new security descriptor
ds_sd_flags = (security.SECINFO_OWNER |
- security.SECINFO_GROUP |
- security.SECINFO_DACL)
+ security.SECINFO_GROUP |
+ security.SECINFO_DACL)
msg = get_gpo_info(self.samdb, gpo=gpo, sd_flags=ds_sd_flags)[0]
ds_sd_ndr = msg['nTSecurityDescriptor'][0]
ds_sd = ndr_unpack(security.descriptor, ds_sd_ndr).as_sddl()
# Set ACL
sio = (security.SECINFO_OWNER |
- security.SECINFO_GROUP |
- security.SECINFO_DACL |
- security.SECINFO_PROTECTED_DACL)
+ security.SECINFO_GROUP |
+ security.SECINFO_DACL |
+ security.SECINFO_PROTECTED_DACL)
conn.set_acl(sharepath, fs_sd, sio)
# Copy GPO files over SMB
filter = ("(&(sAMAccountType=%d)(sAMAccountName=%s))" %
(ATYPE_SECURITY_GLOBAL_GROUP,
- ldb.binary_encode(groupname)))
+ ldb.binary_encode(groupname)))
domaindn = samdb.domain_dn()
#
# May contain DOMAIN_NETBIOS and SERVER_NAME
self.servername_attributes = ["distinguishedName", "name", "CN", "sAMAccountName", "dNSHostName",
- "servicePrincipalName", "rIDSetReferences", "serverReference", "serverReferenceBL",
- "msDS-IsDomainFor", "interSiteTopologyGenerator", ]
+ "servicePrincipalName", "rIDSetReferences", "serverReference", "serverReferenceBL",
+ "msDS-IsDomainFor", "interSiteTopologyGenerator", ]
self.servername_attributes = [x.upper() for x in self.servername_attributes]
#
self.netbios_attributes = ["servicePrincipalName", "CN", "distinguishedName", "nETBIOSName", "name", ]
ol_mmr_urls=None, nosync=False, ldap_backend_forced_uri=None):
from samba.provision import setup_path
super(OpenLDAPBackend, self).__init__(backend_type=backend_type,
- paths=paths, lp=lp,
- names=names, logger=logger,
- domainsid=domainsid, schema=schema, hostname=hostname,
- ldapadminpass=ldapadminpass, slapd_path=slapd_path,
- ldap_backend_extra_port=ldap_backend_extra_port,
- ldap_backend_forced_uri=ldap_backend_forced_uri,
- ldap_dryrun_mode=ldap_dryrun_mode)
+ paths=paths, lp=lp,
+ names=names, logger=logger,
+ domainsid=domainsid, schema=schema, hostname=hostname,
+ ldapadminpass=ldapadminpass, slapd_path=slapd_path,
+ ldap_backend_extra_port=ldap_backend_extra_port,
+ ldap_backend_forced_uri=ldap_backend_forced_uri,
+ ldap_dryrun_mode=ldap_dryrun_mode)
self.ol_mmr_urls = ol_mmr_urls
self.nosync = nosync
cn_samba = read_and_sub_file(
setup_path("cn=samba.ldif"),
{"LDAPADMINPASS": self.ldapadminpass,
- "MMR_PASSWORD": mmr_pass,
- "MMR": mmr})
+ "MMR_PASSWORD": mmr_pass,
+ "MMR": mmr})
mapping = "schema-map-openldap-2.3"
backend_schema = "backend-schema.schema"
refint_config += read_and_sub_file(
setup_path("fedorads-refint-add.ldif"),
{"ARG_NUMBER": str(argnum),
- "LINK_ATTR": attr})
+ "LINK_ATTR": attr})
memberof_config += read_and_sub_file(
setup_path("fedorads-linked-attributes.ldif"),
{"MEMBER_ATTR": attr,
- "MEMBEROF_ATTR": lnkattr[attr]})
+ "MEMBEROF_ATTR": lnkattr[attr]})
index_config += read_and_sub_file(
setup_path("fedorads-index.ldif"), {"ATTR": attr})
argnum += 1
# Remove references to dnsHostName in A, AAAA, NS, CNAME and SRV
values = [ndr_unpack(dnsp.DnssrvRpcRecord, v)
- for v in orig_values if not to_remove(v)]
+ for v in orig_values if not to_remove(v)]
if len(values) != len(orig_values):
logger.info("updating %s keeping %d values, removing %s values"
# the schema files (and corresponding object version) that we know about
base_schemas = {
"2008_R2_old": ("MS-AD_Schema_2K8_R2_Attributes.txt",
- "MS-AD_Schema_2K8_R2_Classes.txt",
- 47),
+ "MS-AD_Schema_2K8_R2_Classes.txt",
+ 47),
"2008_R2": ("Attributes_for_AD_DS__Windows_Server_2008_R2.ldf",
- "Classes_for_AD_DS__Windows_Server_2008_R2.ldf",
- 47),
+ "Classes_for_AD_DS__Windows_Server_2008_R2.ldf",
+ 47),
"2012": ("AD_DS_Attributes__Windows_Server_2012.ldf",
- "AD_DS_Classes__Windows_Server_2012.ldf",
- 56),
+ "AD_DS_Classes__Windows_Server_2012.ldf",
+ 56),
"2012_R2": ("AD_DS_Attributes__Windows_Server_2012_R2.ldf",
- "AD_DS_Classes__Windows_Server_2012_R2.ldf",
- 69),
+ "AD_DS_Classes__Windows_Server_2012_R2.ldf",
+ 69),
}
def __init__(self, domain_sid, invocationid=None, schemadn=None,
computer_utf16 = computer.encode('utf-16-le')
real_stub = struct.pack('<IIII', 0x00200000,
- len(server) + 1, 0, len(server) + 1)
+ len(server) + 1, 0, len(server) + 1)
real_stub += server_utf16 + b'\x00\x00'
mod_len = len(real_stub) % 4
if mod_len != 0:
def read_datafile(filename):
paths = ["../../../../../testdata/samba3",
- "../../../../testdata/samba3"]
+ "../../../../testdata/samba3"]
for p in paths:
datadir = os.path.join(os.path.dirname(__file__), p)
if os.path.exists(datadir):
# test adding groups
for group in self.groups:
(result, out, err) = self.runsubcmd("group", "add", group["name"],
- "--description=%s" % group["description"],
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"],
- os.environ["DC_PASSWORD"]))
+ "--description=%s" % group["description"],
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"],
+ os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "There shouldn't be any error message")
def test_ntvfs(self):
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
- "--use-ntvfs")
+ "--use-ntvfs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out, "", "Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
def test_s3fs(self):
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
- "--use-s3fs")
+ "--use-s3fs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "Shouldn't be any error messages")
def test_ntvfs_check(self):
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
- "--use-ntvfs")
+ "--use-ntvfs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out, "", "Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
def test_s3fs_check(self):
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
- "--use-s3fs")
+ "--use-s3fs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "Shouldn't be any error messages")
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
- "--use-ntvfs")
+ "--use-ntvfs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out, "", "Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
- "--use-s3fs")
+ "--use-s3fs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "Shouldn't be any error messages")
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
- "--use-ntvfs")
+ "--use-ntvfs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out, "", "Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
# Now check they were set correctly
(result, out, err) = self.runsubcmd("ntacl", "get", tempf,
- "--use-ntvfs", "--as-sddl")
+ "--use-ntvfs", "--as-sddl")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertEquals(self.acl + "\n", out, "Output should be the ACL")
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
- "--use-s3fs")
+ "--use-s3fs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out, "", "Shouldn't be any output messages")
self.assertEquals(err, "", "Shouldn't be any error messages")
# Now check they were set correctly
(result, out, err) = self.runsubcmd("ntacl", "get", tempf,
- "--use-s3fs", "--as-sddl")
+ "--use-s3fs", "--as-sddl")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertEquals(self.acl + "\n", out, "Output should be the ACL")
# test adding users with --use-username-as-cn
for user in self.users:
(result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
- "--use-username-as-cn",
- "--surname=%s" % user["surname"],
- "--given-name=%s" % user["given-name"],
- "--job-title=%s" % user["job-title"],
- "--department=%s" % user["department"],
- "--description=%s" % user["description"],
- "--company=%s" % user["company"],
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ "--use-username-as-cn",
+ "--surname=%s" % user["surname"],
+ "--given-name=%s" % user["given-name"],
+ "--job-title=%s" % user["job-title"],
+ "--department=%s" % user["department"],
+ "--description=%s" % user["description"],
+ "--company=%s" % user["company"],
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "Shouldn't be any error messages")
hash)
self.assertEqual(self.ldbs.sam.get_attribute_replmetadata_version(dn,
"unicodePwd"),
- 140)
+ 140)
# This function should not decrement the version
hash[dn.lower()] = 130
hash)
self.assertEqual(self.ldbs.sam.get_attribute_replmetadata_version(dn,
"unicodePwd"),
- 140)
+ 140)
def test_identic_rename(self):
rootdn = "DC=samba,DC=example,DC=com"
plantestsuite("samba3.wbinfo_simple.(%s:local).%s" % (env, t), "%s:local" % env, [os.path.join(srcdir(), "nsswitch/tests/test_wbinfo_simple.sh"), t])
plantestsuite("samba3.wbinfo_name_lookup", env,
[os.path.join(srcdir(),
- "nsswitch/tests/test_wbinfo_name_lookup.sh"),
- '$DOMAIN', '$REALM', '$DC_USERNAME'])
+ "nsswitch/tests/test_wbinfo_name_lookup.sh"),
+ '$DOMAIN', '$REALM', '$DC_USERNAME'])
env = "ad_member:local"
plantestsuite("samba3.wbinfo_user_info", env,
[os.path.join(srcdir(),
- "nsswitch/tests/test_wbinfo_user_info.sh"),
- '$DOMAIN', '$REALM', '$DOMAIN', 'alice', 'alice', 'jane', 'jane.doe'])
+ "nsswitch/tests/test_wbinfo_user_info.sh"),
+ '$DOMAIN', '$REALM', '$DOMAIN', 'alice', 'alice', 'jane', 'jane.doe'])
env = "fl2008r2dc:local"
plantestsuite("samba3.wbinfo_user_info", env,
[os.path.join(srcdir(),
- "nsswitch/tests/test_wbinfo_user_info.sh"),
- '$TRUST_DOMAIN', '$TRUST_REALM', '$DOMAIN', 'alice', 'alice', 'jane', 'jane.doe'])
+ "nsswitch/tests/test_wbinfo_user_info.sh"),
+ '$TRUST_DOMAIN', '$TRUST_REALM', '$DOMAIN', 'alice', 'alice', 'jane', 'jane.doe'])
env = "ad_member"
t = "WBCLIENT-MULTI-PING"
plantestsuite("samba3.blackbox.net_tdb", "simpleserver:local",
[os.path.join(samba3srcdir, "script/tests/test_net_tdb.sh"),
- smbclient3, '$SERVER', 'tmp', '$USERNAME', '$PASSWORD',
- configuration, '$LOCAL_PATH', '$LOCK_DIR'])
+ smbclient3, '$SERVER', 'tmp', '$USERNAME', '$PASSWORD',
+ configuration, '$LOCAL_PATH', '$LOCK_DIR'])
plantestsuite("samba3.blackbox.smbd_error", "simpleserver:local",
[os.path.join(samba3srcdir, "script/tests/test_smbd_error.sh")])
plantestsuite("samba3.blackbox.net_cache_samlogon", "ad_member:local",
[os.path.join(samba3srcdir, "script/tests/test_net_cache_samlogon.sh"),
- '$SERVER', 'tmp', '$DC_USERNAME', '$DC_PASSWORD'])
+ '$SERVER', 'tmp', '$DC_USERNAME', '$DC_PASSWORD'])
plantestsuite("samba3.blackbox.net_dom_join_fail_dc", "nt4_dc",
[os.path.join(samba3srcdir, "script/tests/test_net_dom_join_fail_dc.sh"),
else:
classinfo[name]["objectClassCategory"] = 0
for a in ["possSuperiors", "systemPossSuperiors",
- "auxiliaryClass", "systemAuxiliaryClass",
- "subClassOf"]:
+ "auxiliaryClass", "systemAuxiliaryClass",
+ "subClassOf"]:
classinfo[name][a] = []
if r.get(a):
for i in r[a]:
self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
self.ldb_user2.modify_ldif(ldif)
res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)"
- % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
+ % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
# but not other users
ldif = """
Member: """ + self.get_user_dn(self.user_with_wp)
self.ldb_user.modify_ldif(ldif)
res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)"
- % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
+ % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
ldif = """
dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
self.ldb_user.modify_ldif(ldif)
res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)"
- % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
+ % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
def test_modify_anonymous(self):
ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
ctx.never_reveal_sid = ["<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
- "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
- "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
+ "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
+ "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
mysid = ctx.get_mysid()
delete_force(self.ldb, user_dn)
try:
self.ldb.add({"dn": user_dn,
- "objectClass": "user",
- "sAMAccountName": user_name,
- "nTSecurityDescriptor": []})
+ "objectClass": "user",
+ "sAMAccountName": user_name,
+ "nTSecurityDescriptor": []})
self.fail()
except LdbError as e107:
(num, _) = e107.args
#
delete_force(self.ldb, user_dn)
self.ldb.add({"dn": user_dn,
- "objectClass": "user",
- "sAMAccountName": user_name})
+ "objectClass": "user",
+ "sAMAccountName": user_name})
m = Message()
m.dn = Dn(ldb, user_dn)
delete_force(self.ldb, user_dn)
self.ldb.add({"dn": user_dn,
- "objectClass": "user",
- "sAMAccountName": user_name})
+ "objectClass": "user",
+ "sAMAccountName": user_name})
#
# We check the following values:
def setUp(self):
super(SchemaTests_msDS_isRODC, self).setUp()
self.ldb = SamDB(host, credentials=creds,
- session_info=system_session(lp), lp=lp, options=ldb_options)
+ session_info=system_session(lp), lp=lp, options=ldb_options)
res = self.ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["defaultNamingContext"])
self.assertEquals(len(res), 1)
self.base_dn = res[0]["defaultNamingContext"][0]
if 'LDAP_REFERRAL' not in stderr:
raise RodcRwdcTestException()
print("ignoring +%s REFERRAL error; assuming %s is RODC" %
- (opt, dc))
+ (opt, dc))
def preload_rodc_user(user_dn):
if result_attr == "msTSExpireDate4":
print('-' * 72)
print("This test fails against Windows with the "
- "default number of elements (33).")
+ "default number of elements (33).")
print("Try with --elements=27 (or similar).")
print('-' * 72)
self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
session_info_flags = (AUTH_SESSION_INFO_DEFAULT_GROUPS |
- AUTH_SESSION_INFO_AUTHENTICATED |
- AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
+ AUTH_SESSION_INFO_AUTHENTICATED |
+ AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
if creds.get_kerberos_state() == DONT_USE_KERBEROS:
session_info_flags |= AUTH_SESSION_INFO_NTLM
self.test_user_dn = res[0].dn
session_info_flags = (AUTH_SESSION_INFO_DEFAULT_GROUPS |
- AUTH_SESSION_INFO_AUTHENTICATED |
- AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
+ AUTH_SESSION_INFO_AUTHENTICATED |
+ AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
if creds.get_kerberos_state() == DONT_USE_KERBEROS:
session_info_flags |= AUTH_SESSION_INFO_NTLM
if expected_results != results:
print("attr %s before %d after %d offset %d" %
- (attr, before, after, offset))
+ (attr, before, after, offset))
self.assertEquals(expected_results, results)
n = len(self.users)
if offset != 0:
raise
print("offset %d denominator %d raised error "
- "expected error %s\n"
- "(offset zero is illegal unless "
- "content count is zero)" %
- (offset, denominator, e))
+ "expected error %s\n"
+ "(offset zero is illegal unless "
+ "content count is zero)" %
+ (offset, denominator, e))
continue
results = [x[attr][0].lower() for x in res]
if cstr.startswith('vlv_resp'):
bits = cstr.rsplit(':')
print("the answer is %s; we said %d" %
- (bits[2], real_offset))
+ (bits[2], real_offset))
break
def test_server_vlv_no_cookie(self):
base=base,
scope=ldb.SCOPE_ONELEVEL)
print("searching for attr %s amongst %d deleted objects" %
- (attr, len(expected_order)))
+ (attr, len(expected_order)))
sort_control = "server_sort:1:0:%s" % attr
step = max(len(expected_order) // 10, 1)
for before in [3, 0]:
print(expected_order)
print()
print("\nattr %s offset %d before %d "
- "after %d gte %s" %
- (attr, offset, before, after, gte))
+ "after %d gte %s" %
+ (attr, offset, before, after, gte))
self.assertEquals(expected_results, results)
def test_multiple_searches(self):
"sAMAccountName": username,
"userPrincipalName": "test2@test.com",
"servicePrincipalName": ["test2/%s" % self.ldb_dc1.get_default_basedn(),
- "test3/%s" % self.ldb_dc1.get_default_basedn()],
+ "test3/%s" % self.ldb_dc1.get_default_basedn()],
"displayName": "test2"}
self.ldb_dc1.add(user_record)
ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
ctx.never_reveal_sid = ["<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
- "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
- "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
+ "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
+ "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
mysid = ctx.get_mysid()
t.cmd_contains("bin/samba-tool drs showrepl ${HOSTNAME}.${LCREALM} -k yes",
["INBOUND NEIGHBORS",
- "${BASEDN}",
- "Last attempt .* was successful",
- "CN=Configuration,${BASEDN}",
- "Last attempt .* was successful",
- "CN=Configuration,${BASEDN}", # cope with either order
- "Last attempt .* was successful",
- "OUTBOUND NEIGHBORS",
- "${BASEDN}",
- "Last success",
- "CN=Configuration,${BASEDN}",
- "Last success",
- "CN=Configuration,${BASEDN}",
- "Last success"],
+ "${BASEDN}",
+ "Last attempt .* was successful",
+ "CN=Configuration,${BASEDN}",
+ "Last attempt .* was successful",
+ "CN=Configuration,${BASEDN}", # cope with either order
+ "Last attempt .* was successful",
+ "OUTBOUND NEIGHBORS",
+ "${BASEDN}",
+ "Last success",
+ "CN=Configuration,${BASEDN}",
+ "Last success",
+ "CN=Configuration,${BASEDN}",
+ "Last success"],
ordered=True,
regex=True)
t.cmd_contains("bin/samba-tool drs showrepl ${WIN_HOSTNAME}.${LCREALM} -k yes",
["INBOUND NEIGHBORS",
- "${BASEDN}",
- "Last attempt .* was successful",
- "CN=Configuration,${BASEDN}",
- "Last attempt .* was successful",
- "CN=Configuration,${BASEDN}",
- "Last attempt .* was successful",
- "OUTBOUND NEIGHBORS",
- "${BASEDN}",
- "Last success",
- "CN=Configuration,${BASEDN}",
- "Last success",
- "CN=Configuration,${BASEDN}",
- "Last success"],
+ "${BASEDN}",
+ "Last attempt .* was successful",
+ "CN=Configuration,${BASEDN}",
+ "Last attempt .* was successful",
+ "CN=Configuration,${BASEDN}",
+ "Last attempt .* was successful",
+ "OUTBOUND NEIGHBORS",
+ "${BASEDN}",
+ "Last success",
+ "CN=Configuration,${BASEDN}",
+ "Last success",
+ "CN=Configuration,${BASEDN}",
+ "Last success"],
ordered=True,
regex=True)
t.cmd_contains("bin/samba-tool drs showrepl ${HOSTNAME}.${LCREALM}",
["INBOUND NEIGHBORS",
- "OUTBOUND NEIGHBORS",
- "${BASEDN}",
- "Last attempt.*was successful",
- "CN=Configuration,${BASEDN}",
- "Last attempt.*was successful",
- "CN=Configuration,${BASEDN}",
- "Last attempt.*was successful"],
+ "OUTBOUND NEIGHBORS",
+ "${BASEDN}",
+ "Last attempt.*was successful",
+ "CN=Configuration,${BASEDN}",
+ "Last attempt.*was successful",
+ "CN=Configuration,${BASEDN}",
+ "Last attempt.*was successful"],
ordered=True,
regex=True)
t.cmd_contains("bin/samba-tool drs showrepl ${WIN_HOSTNAME}.${WIN_REALM} -k yes",
["INBOUND NEIGHBORS",
- "OUTBOUND NEIGHBORS",
- "${WIN_BASEDN}",
- "Last attempt .* was successful",
- "CN=Configuration,${WIN_BASEDN}",
- "Last attempt .* was successful",
- "CN=Configuration,${WIN_BASEDN}",
- "Last attempt .* was successful"],
+ "OUTBOUND NEIGHBORS",
+ "${WIN_BASEDN}",
+ "Last attempt .* was successful",
+ "CN=Configuration,${WIN_BASEDN}",
+ "Last attempt .* was successful",
+ "CN=Configuration,${WIN_BASEDN}",
+ "Last attempt .* was successful"],
ordered=True,
regex=True)