m.dn = ldb.Dn(l, "dc=foo4")
m["bla"] = b"bla"
self.assertEqual(len(l.search()), 0)
- self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
+ self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
def test_add_dict(self):
l = ldb.Ldb(self.url(), flags=self.flags())
def test_no_crash_broken_expr(self):
l = ldb.Ldb(self.url(), flags=self.flags())
- self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
+ self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
# Run the SimpleLdb tests against an lmdb backend
class SimpleLdbLmdb(SimpleLdb):
if node.text and node.text.startswith('|Operation'):
# Strip first and last |
updates = [x[1:len(x) - 1].split('|') for x in
- ET.tostring(node,method='text').splitlines()]
+ ET.tostring(node, method='text').splitlines()]
for update in updates[2:]:
output = re.match('Operation (\d+): {(.*)}', update[0])
if output:
}
# separated by commas in docs, and must be broken up
-multivalued_attrs = set(["auxiliaryclass","maycontain","mustcontain","posssuperiors",
- "systemauxiliaryclass","systemmaycontain","systemmustcontain",
+multivalued_attrs = set(["auxiliaryclass", "maycontain", "mustcontain", "posssuperiors",
+ "systemauxiliaryclass", "systemmaycontain", "systemmustcontain",
"systemposssuperiors"])
def __read_folded_line(f, buffer):
# This help formatter does text wrapping and preserves newlines
class PlainHelpFormatter(optparse.IndentedHelpFormatter):
- def format_description(self,description=""):
+ def format_description(self, description=""):
desc_width = self.width - self.current_indent
indent = " " * self.current_indent
paragraphs = description.split('\n')
usage=self.synopsis,
description=self.full_description,
formatter=PlainHelpFormatter(),
- prog=prog,epilog=epilog)
+ prog=prog, epilog=epilog)
parser.add_options(self.takes_options)
optiongroups = {}
for name, optiongroup in self.takes_optiongroups.items():
samdb_schema = SamDB(session_info=system_session(), url=None,
credentials=creds, lp=lp)
- scope_map = {"SUB": ldb.SCOPE_SUBTREE, "BASE": ldb.SCOPE_BASE, "ONE":ldb.SCOPE_ONELEVEL}
+ scope_map = {"SUB": ldb.SCOPE_SUBTREE, "BASE": ldb.SCOPE_BASE, "ONE": ldb.SCOPE_ONELEVEL}
scope = scope.upper()
if not scope in scope_map:
raise CommandError("Unknown scope %s" % scope)
takes_options = [
Option('--client-version', help='Client Version',
default='longhorn', metavar='w2k|dotnet|longhorn',
- choices=['w2k','dotnet','longhorn'], dest='cli_ver'),
+ choices=['w2k', 'dotnet', 'longhorn'], dest='cli_ver'),
]
def run(self, server, cli_ver, sambaopts=None, credopts=None,
takes_options = [
Option('--client-version', help='Client Version',
default='longhorn', metavar='w2k|dotnet|longhorn',
- choices=['w2k','dotnet','longhorn'], dest='cli_ver'),
+ choices=['w2k', 'dotnet', 'longhorn'], dest='cli_ver'),
]
def run(self, server, zone, cli_ver, sambaopts=None, credopts=None,
takes_options = [
Option('--client-version', help='Client Version',
default='longhorn', metavar='w2k|dotnet|longhorn',
- choices=['w2k','dotnet','longhorn'], dest='cli_ver'),
+ choices=['w2k', 'dotnet', 'longhorn'], dest='cli_ver'),
Option('--primary', help='List primary zones (default)',
action='store_true', dest='primary'),
Option('--secondary', help='List secondary zones',
takes_options = [
Option('--client-version', help='Client Version',
default='longhorn', metavar='w2k|dotnet|longhorn',
- choices=['w2k','dotnet','longhorn'], dest='cli_ver')
+ choices=['w2k', 'dotnet', 'longhorn'], dest='cli_ver')
]
def run(self, server, zone, cli_ver, sambaopts=None, credopts=None,
def run(self, server, zone, name, rtype, data, sambaopts=None,
credopts=None, versionopts=None):
- if rtype.upper() not in ('A','AAAA','PTR','CNAME','NS','MX','SRV','TXT'):
+ if rtype.upper() not in ('A', 'AAAA', 'PTR', 'CNAME', 'NS', 'MX', 'SRV', 'TXT'):
raise CommandError('Adding record of type %s is not supported' % rtype)
record_type = dns_type_flag(rtype)
def run(self, server, zone, name, rtype, olddata, newdata,
sambaopts=None, credopts=None, versionopts=None):
- if rtype.upper() not in ('A','AAAA','PTR','CNAME','NS','MX','SOA','SRV','TXT'):
+ if rtype.upper() not in ('A', 'AAAA', 'PTR', 'CNAME', 'NS', 'MX', 'SOA', 'SRV', 'TXT'):
raise CommandError('Updating record of type %s is not supported' % rtype)
record_type = dns_type_flag(rtype)
def run(self, server, zone, name, rtype, data, sambaopts=None, credopts=None, versionopts=None):
- if rtype.upper() not in ('A','AAAA','PTR','CNAME','NS','MX','SRV','TXT'):
+ if rtype.upper() not in ('A', 'AAAA', 'PTR', 'CNAME', 'NS', 'MX', 'SRV', 'TXT'):
raise CommandError('Deleting record of type %s is not supported' % rtype)
record_type = dns_type_flag(rtype)
p = subprocess.Popen([testparm, '-s', '-l',
'--parameter-name=%s' % varname, smbconf],
stdout=subprocess.PIPE, stderr=errfile)
- (out,err) = p.communicate()
+ (out, err) = p.communicate()
errfile.close()
lines = out.split('\n')
if lines:
]
ntvfs_options = [
- Option("--use-xattrs", type="choice", choices=["yes","no","auto"],
+ Option("--use-xattrs", type="choice", choices=["yes", "no", "auto"],
metavar="[yes|no|auto]",
help="Define if we should use the native fs capabilities or a tdb file for "
"storing attributes likes ntacl when --use-ntvfs is set. "
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
metavar="URL", dest="H"),
Option("-q", "--quiet", help="Be quiet", action="store_true"), # unused
- Option("--complexity", type="choice", choices=["on","off","default"],
+ Option("--complexity", type="choice", choices=["on", "off", "default"],
help="The password complexity (on | off | default). Default is 'on'"),
- Option("--store-plaintext", type="choice", choices=["on","off","default"],
+ Option("--store-plaintext", type="choice", choices=["on", "off", "default"],
help="Store plaintext passwords where account have 'store passwords with reversible encryption' set (on | off | default). Default is 'off'"),
Option("--history-length",
help="The password history length (<integer> | default). Default is 24.", type=str),
]
ntvfs_options = [
- Option("--use-xattrs", type="choice", choices=["yes","no","auto"],
+ Option("--use-xattrs", type="choice", choices=["yes", "no", "auto"],
metavar="[yes|no|auto]",
help="Define if we should use the native fs capabilities or a tdb file for "
"storing attributes likes ntacl when --use-ntvfs is set. "
("DRSUAPI_SUPPORTED_EXTENSION_ADDENTRY_V2", "DRS_EXT_ADDENTRY_V2"),
("DRSUAPI_SUPPORTED_EXTENSION_LINKED_VALUE_REPLICATION", "DRS_EXT_LINKED_VALUE_REPLICATION"),
("DRSUAPI_SUPPORTED_EXTENSION_DCINFO_V2", "DRS_EXT_DCINFO_V2"),
- ("DRSUAPI_SUPPORTED_EXTENSION_INSTANCE_TYPE_NOT_REQ_ON_MOD","DRS_EXT_INSTANCE_TYPE_NOT_REQ_ON_MOD"),
+ ("DRSUAPI_SUPPORTED_EXTENSION_INSTANCE_TYPE_NOT_REQ_ON_MOD", "DRS_EXT_INSTANCE_TYPE_NOT_REQ_ON_MOD"),
("DRSUAPI_SUPPORTED_EXTENSION_CRYPTO_BIND", "DRS_EXT_CRYPTO_BIND"),
("DRSUAPI_SUPPORTED_EXTENSION_GET_REPL_INFO", "DRS_EXT_GET_REPL_INFO"),
("DRSUAPI_SUPPORTED_EXTENSION_STRONG_ENCRYPTION", "DRS_EXT_STRONG_ENCRYPTION"),
res = samdb.search(base=trusteedn, expression="(objectClass=*)",
scope=SCOPE_BASE)
assert(len(res) == 1)
- return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
+ return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
def modify_descriptor(self, samdb, object_dn, desc, controls=None):
assert(isinstance(desc, security.descriptor))
def get_domain_sid(self, samdb):
res = samdb.search(base=samdb.domain_dn(),
expression="(objectClass=*)", scope=SCOPE_BASE)
- return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
+ return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
def add_ace(self, samdb, object_dn, new_ace):
"""Add new ace explicitly."""
try:
msg = self.samdb.search(expression='(&(|(samAccountName=%s)(samAccountName=%s$))(objectClass=User))' %
- (ldb.binary_encode(username),ldb.binary_encode(username)))
+ (ldb.binary_encode(username), ldb.binary_encode(username)))
user_dn = msg[0].dn
except Exception:
raise CommandError("Failed to find account %s" % username)
def find_domain_sid(self):
res = self.ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
- return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
+ return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
def find_servers(self):
"""
# Attributes that contain the Domain name e.g. 'samba.org'
self.domain_attributes = [
"proxyAddresses", "mail", "userPrincipalName", "msExchSmtpFullyQualifiedDomainName",
- "dnsHostName", "networkAddress", "dnsRoot", "servicePrincipalName",]
+ "dnsHostName", "networkAddress", "dnsRoot", "servicePrincipalName", ]
self.domain_attributes = [x.upper() for x in self.domain_attributes]
#
# May contain DOMAIN_NETBIOS and SERVER_NAME
self.servername_attributes = ["distinguishedName", "name", "CN", "sAMAccountName", "dNSHostName",
"servicePrincipalName", "rIDSetReferences", "serverReference", "serverReferenceBL",
- "msDS-IsDomainFor", "interSiteTopologyGenerator",]
+ "msDS-IsDomainFor", "interSiteTopologyGenerator", ]
self.servername_attributes = [x.upper() for x in self.servername_attributes]
#
- self.netbios_attributes = ["servicePrincipalName", "CN", "distinguishedName", "nETBIOSName", "name",]
+ self.netbios_attributes = ["servicePrincipalName", "CN", "distinguishedName", "nETBIOSName", "name", ]
self.netbios_attributes = [x.upper() for x in self.netbios_attributes]
#
- self.other_attributes = ["name", "DC",]
+ self.other_attributes = ["name", "DC", ]
self.other_attributes = [x.upper() for x in self.other_attributes]
#
self.ignore_attributes = [x.upper() for x in self.ignore_attributes]
con1 = LDAPBase(URL1, creds, lp,
two=two, quiet=quiet, descriptor=descriptor, sort_aces=sort_aces,
- verbose=verbose,view=view, base=base, scope=scope,
+ verbose=verbose, view=view, base=base, scope=scope,
outf=self.outf, errf=self.errf)
assert len(con1.base_dn) > 0
from samba.credentials import DONT_USE_KERBEROS
import samba.getopt as options
from samba.dcerpc import security, idmap
-from samba.ntacls import setntacl, getntacl,getdosinfo
+from samba.ntacls import setntacl, getntacl, getdosinfo
from samba import Ldb
from samba.ndr import ndr_unpack, ndr_print
from samba.samdb import SamDB
takes_options = [
Option("-q", "--quiet", help="Be quiet", action="store_true"),
Option("--xattr-backend", type="choice", help="xattr backend type (native fs or tdb)",
- choices=["native","tdb"]),
+ choices=["native", "tdb"]),
Option("--eadb-file", help="Name of the tdb file where attributes are stored", type="string"),
Option("--use-ntvfs", help="Set the ACLs directly to the TDB or xattr for use with the ntvfs file server", action="store_true"),
Option("--use-s3fs", help="Set the ACLs for use with the default s3fs file server via the VFS layer", action="store_true"),
Option("--service", help="Name of the smb.conf service to use when applying the ACLs", type="string")
]
- takes_args = ["acl","file"]
+ takes_args = ["acl", "file"]
def run(self, acl, file, use_ntvfs=False, use_s3fs=False,
- quiet=False,xattr_backend=None,eadb_file=None,
+ quiet=False, xattr_backend=None, eadb_file=None,
credopts=None, sambaopts=None, versionopts=None,
service=None):
logger = self.get_logger()
takes_options = [
Option("--as-sddl", help="Output ACL in the SDDL format", action="store_true"),
Option("--xattr-backend", type="choice", help="xattr backend type (native fs or tdb)",
- choices=["native","tdb"]),
+ choices=["native", "tdb"]),
Option("--eadb-file", help="Name of the tdb file where attributes are stored", type="string"),
Option("--use-ntvfs", help="Get the ACLs directly from the TDB or xattr used with the ntvfs file server", action="store_true"),
Option("--use-s3fs", help="Get the ACLs for use via the VFS layer used by the default s3fs file server", action="store_true"),
# These assertions correct for current ad_dc selftest
# configuration. When other environments have a broad range of
# groups mapped via passdb, we can relax some of these checks
- (LA_uid,LA_type) = s4_passdb.sid_to_id(LA_sid)
+ (LA_uid, LA_type) = s4_passdb.sid_to_id(LA_sid)
if (LA_type != idmap.ID_TYPE_UID and LA_type != idmap.ID_TYPE_BOTH):
raise CommandError("SID %s is not mapped to a UID" % LA_sid)
- (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid)
+ (BA_gid, BA_type) = s4_passdb.sid_to_id(BA_sid)
if (BA_type != idmap.ID_TYPE_GID and BA_type != idmap.ID_TYPE_BOTH):
raise CommandError("SID %s is not mapped to a GID" % BA_sid)
self.samdb_url = H
self.dirsync_filter = dirsync_filter
self.dirsync_attrs = dirsync_attrs
- self.dirsync_controls = ["dirsync:1:0:0","extended_dn:1:0"];
+ self.dirsync_controls = ["dirsync:1:0:0", "extended_dn:1:0"];
self.password_attrs = password_attrs
self.decrypt_samba_gpg = decrypt_samba_gpg
self.sync_command = sync_command
self.current_pid = None
self.outf.write("Initialized cache_ldb[%s]\n" % (cache_ldb))
msgs = self.cache.parse_ldif(add_ldif)
- changetype,msg = next(msgs)
+ changetype, msg = next(msgs)
ldif = self.cache.write_ldif(msg, ldb.CHANGETYPE_NONE)
self.outf.write("%s" % ldif)
else:
assert len(res_controls) > 0
assert res_controls[0].oid == "1.2.840.113556.1.4.841"
res_controls[0].critical = True
- self.dirsync_controls = [str(res_controls[0]),"extended_dn:1:0"]
+ self.dirsync_controls = [str(res_controls[0]), "extended_dn:1:0"]
log_msg("dirsyncControls: %r\n" % self.dirsync_controls)
modify_ldif = "dn: %s\n" % (self.cache_dn)
# netbiosname
# Get the netbiosname first (could be obtained from smb.conf in theory)
res = secretsdb.search(expression="(flatname=%s)" %
- names.domain,base="CN=Primary Domains",
+ names.domain, base="CN=Primary Domains",
scope=ldb.SCOPE_SUBTREE, attrs=["sAMAccountName"])
- names.netbiosname = str(res[0]["sAMAccountName"]).replace("$","")
+ names.netbiosname = str(res[0]["sAMAccountName"]).replace("$", "")
names.smbconf = smbconf
current = samdb.search(expression="(objectClass=*)",
base="", scope=ldb.SCOPE_BASE,
attrs=["defaultNamingContext", "schemaNamingContext",
- "configurationNamingContext","rootDomainNamingContext",
+ "configurationNamingContext", "rootDomainNamingContext",
"namingContexts"])
names.configdn = current[0]["configurationNamingContext"][0]
# domain guid/sid
res6 = samdb.search(expression="(objectClass=*)", base=basedn,
scope=ldb.SCOPE_BASE, attrs=["objectGUID",
- "objectSid","msDS-Behavior-Version"])
+ "objectSid", "msDS-Behavior-Version"])
names.domainguid = str(ndr_unpack(misc.GUID, res6[0]["objectGUID"][0]))
names.domainsid = ndr_unpack(security.dom_sid, res6[0]["objectSid"][0])
names.forestsid = ndr_unpack(security.dom_sid, res6[0]["objectSid"][0])
# policy guid
res7 = samdb.search(expression="(name={%s})" % DEFAULT_POLICY_GUID,
base="CN=Policies,CN=System," + basedn,
- scope=ldb.SCOPE_ONELEVEL, attrs=["cn","displayName"])
- names.policyid = str(res7[0]["cn"]).replace("{","").replace("}","")
+ scope=ldb.SCOPE_ONELEVEL, attrs=["cn", "displayName"])
+ names.policyid = str(res7[0]["cn"]).replace("{", "").replace("}", "")
# dc policy guid
res8 = samdb.search(expression="(name={%s})" % DEFAULT_DC_POLICY_GUID,
base="CN=Policies,CN=System," + basedn,
scope=ldb.SCOPE_ONELEVEL,
- attrs=["cn","displayName"])
+ attrs=["cn", "displayName"])
if len(res8) == 1:
- names.policyid_dc = str(res8[0]["cn"]).replace("{","").replace("}","")
+ names.policyid_dc = str(res8[0]["cn"]).replace("{", "").replace("}", "")
else:
names.policyid_dc = None
samdb.add(delta)
-def get_max_usn(samdb,basedn):
+def get_max_usn(samdb, basedn):
""" This function return the biggest USN present in the provision
:param samdb: A LDB object pointing to the sam.ldb
(ie. DC=foo, DC=bar)
:return: The biggest USN in the provision"""
- res = samdb.search(expression="objectClass=*",base=basedn,
- scope=ldb.SCOPE_SUBTREE,attrs=["uSNChanged"],
+ res = samdb.search(expression="objectClass=*", base=basedn,
+ scope=ldb.SCOPE_SUBTREE, attrs=["uSNChanged"],
controls=["search_options:1:2",
"server_sort:1:1:uSNChanged",
"paged_results:1:1"])
:param policyguid: GUID of the default domain policy
:param policyguid_dc: GUID of the default domain controler policy
"""
- policy_path = getpolicypath(sysvolpath,dnsdomain,policyguid)
+ policy_path = getpolicypath(sysvolpath, dnsdomain, policyguid)
create_gpo_struct(policy_path)
- policy_path = getpolicypath(sysvolpath,dnsdomain,policyguid_dc)
+ policy_path = getpolicypath(sysvolpath, dnsdomain, policyguid_dc)
create_gpo_struct(policy_path)
# For now, make these equal
mmr_pass = self.ldapadminpass
- url_list = filter(None,self.ol_mmr_urls.split(','))
+ url_list = filter(None, self.ol_mmr_urls.split(','))
for url in url_list:
self.logger.info("Using LDAP-URL: " + url)
if len(url_list) == 1:
"REFINT_CONFIG": refint_config,
"INDEX_CONFIG": index_config,
"ADMIN_UID": str(os.getuid()),
- "NOSYNC": nosync_config,})
+ "NOSYNC": nosync_config, })
self.setup_db_dir(os.path.join(self.ldapdir, "db", "forestdns"))
self.setup_db_dir(os.path.join(self.ldapdir, "db", "domaindns"))
return os.path.join(setup_dir(), file)
-def setup_add_ldif(ldb, ldif_path, subst_vars=None,controls=["relax:0"]):
+def setup_add_ldif(ldb, ldif_path, subst_vars=None, controls=["relax:0"]):
"""Setup a ldb in the private dir.
:param ldb: LDB file to import data into
ldb.add_ldif(data, controls)
-def setup_modify_ldif(ldb, ldif_path, subst_vars=None,controls=["relax:0"]):
+def setup_modify_ldif(ldb, ldif_path, subst_vars=None, controls=["relax:0"]):
"""Modify a ldb in the private dir.
:param ldb: LDB object.
return attributes
-def get_dnsyntax_attributes(schemadn,schemaldb):
+def get_dnsyntax_attributes(schemadn, schemaldb):
res = schemaldb.search(
expression="(&(!(linkID=*))(objectclass=attributeSchema)(attributeSyntax=2.5.5.1))",
base=schemadn, scope=SCOPE_ONELEVEL,
def dacl_add_ace(self, object_dn, ace):
"""Add an ACE to an objects security descriptor
"""
- desc = self.read_sd_on_dn(object_dn,["show_deleted:1"])
+ desc = self.read_sd_on_dn(object_dn, ["show_deleted:1"])
desc_sddl = desc.as_sddl(self.domain_sid)
if ace in desc_sddl:
return
dn4 = dsdb_Dn(sam, "B:8:00000000:<GUID=ffffffff-17f4-452a-b002-963e1909d101>;OU=dn4,DC=samba,DC=example,DC=com")
dn5 = dsdb_Dn(sam, "<GUID=ffffffff-27f4-452a-b002-963e1909d101>;OU=dn5,DC=samba,DC=example,DC=com")
dn6 = dsdb_Dn(sam, "<GUID=00000000-27f4-452a-b002-963e1909d101>;OU=dn6,DC=samba,DC=example,DC=com")
- unsorted_links14 = [dn1,dn2,dn3,dn4]
+ unsorted_links14 = [dn1, dn2, dn3, dn4]
sorted_vals14 = [str(dn) for dn in sorted(unsorted_links14)]
self.assertEquals(sorted_vals14[0], str(dn3))
self.assertEquals(sorted_vals14[1], str(dn2))
self.assertEquals(sorted_vals14[2], str(dn1))
self.assertEquals(sorted_vals14[3], str(dn4))
- unsorted_links56 = [dn5,dn6]
+ unsorted_links56 = [dn5, dn6]
sorted_vals56 = [str(dn) for dn in sorted(unsorted_links56)]
self.assertEquals(sorted_vals56[0], str(dn6))
self.assertEquals(sorted_vals56[1], str(dn5))
# With a known but wrong syntax we get a protocol error
# see test_no_auth_presentation_ctx_valid2
- tsf1b_list = [zero_syntax,samba.dcerpc.epmapper.abstract_syntax(),ndr64]
+ tsf1b_list = [zero_syntax, samba.dcerpc.epmapper.abstract_syntax(), ndr64]
ctx1b = dcerpc.ctx_list()
ctx1b.context_id = 1
ctx1b.num_transfer_syntaxes = len(tsf1b_list)
# With a unknown but wrong syntaxes we get NO protocol error
# see test_no_auth_presentation_ctx_invalid4
- tsf1b_list = [zero_syntax,samba.dcerpc.epmapper.abstract_syntax()]
+ tsf1b_list = [zero_syntax, samba.dcerpc.epmapper.abstract_syntax()]
ctx1b = dcerpc.ctx_list()
ctx1b.context_id = 1
ctx1b.num_transfer_syntaxes = len(tsf1b_list)
self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
- tsf1_list = [zero_syntax,ndr32]
+ tsf1_list = [zero_syntax, ndr32]
ctx1 = dcerpc.ctx_list()
ctx1.context_id = 1
ctx1.num_transfer_syntaxes = len(tsf1_list)
self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
- tsf2_list = [ndr32,ndr32]
+ tsf2_list = [ndr32, ndr32]
ctx2 = dcerpc.ctx_list()
ctx2.context_id = 2
ctx2.num_transfer_syntaxes = len(tsf2_list)
ctx4.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
ctx4.transfer_syntaxes = tsf4_list
- req = self.generate_alter(call_id=34, ctx_list=[ctx3,ctx4])
+ req = self.generate_alter(call_id=34, ctx_list=[ctx3, ctx4])
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id,
self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
- req = self.generate_alter(call_id=43, ctx_list=[ctx4,ctx3])
+ req = self.generate_alter(call_id=43, ctx_list=[ctx4, ctx3])
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id,
self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
- req = self.generate_alter(call_id=44, ctx_list=[ctx4,ctx4])
+ req = self.generate_alter(call_id=44, ctx_list=[ctx4, ctx4])
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id,
ctx5epm.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
ctx5epm.transfer_syntaxes = tsf5epm_list
- req = self.generate_alter(call_id=55, ctx_list=[ctx5mgmt,ctx5epm])
+ req = self.generate_alter(call_id=55, ctx_list=[ctx5mgmt, ctx5epm])
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id,
self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
- req = self.generate_alter(call_id=55, ctx_list=[ctx5mgmt,ctx5epm])
+ req = self.generate_alter(call_id=55, ctx_list=[ctx5mgmt, ctx5epm])
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id,
zero_syntax = misc.ndr_syntax_id()
ndr64 = base.transfer_syntax_ndr64()
- tsf1_list = [btf1,btf2,zero_syntax]
+ tsf1_list = [btf1, btf2, zero_syntax]
ctx1 = dcerpc.ctx_list()
ctx1.context_id = 1
ctx1.num_transfer_syntaxes = len(tsf1_list)
zero_syntax = misc.ndr_syntax_id()
- tsf1_list = [zero_syntax,btf1,btf2,zero_syntax]
+ tsf1_list = [zero_syntax, btf1, btf2, zero_syntax]
ctx1 = dcerpc.ctx_list()
ctx1.context_id = 1
ctx1.num_transfer_syntaxes = len(tsf1_list)
ctx2.abstract_syntax = zero_syntax
ctx2.transfer_syntaxes = tsf2_list
- req = self.generate_bind(call_id=0, ctx_list=[ctx1,ctx2])
+ req = self.generate_bind(call_id=0, ctx_list=[ctx1, ctx2])
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
zero_syntax = misc.ndr_syntax_id()
ndr64 = base.transfer_syntax_ndr64()
- tsf1_list = [btf1,btf2,zero_syntax]
+ tsf1_list = [btf1, btf2, zero_syntax]
ctx1 = dcerpc.ctx_list()
ctx1.context_id = 1
ctx1.num_transfer_syntaxes = len(tsf1_list)
opnum=0,
alloc_hint=0xffffffff,
stub="\00" * chunk_size)
- self.send_pdu(req,ndr_print=True,hexdump=True)
- rep = self.recv_pdu(ndr_print=True,hexdump=True)
+ self.send_pdu(req, ndr_print=True, hexdump=True)
+ rep = self.recv_pdu(ndr_print=True, hexdump=True)
self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
auth_length=0)
self.assertNotEquals(rep.u.alloc_hint, 0)
alloc_hint -= thistime
else:
alloc_hint = 0
- self.send_pdu(req,hexdump=False)
+ self.send_pdu(req, hexdump=False)
if fault_first is not None:
rep = self.recv_pdu()
# We get a fault back
ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
ctx1.transfer_syntaxes = tsf1_list
- tsf1b_list = [ndr32,ndr64]
+ tsf1b_list = [ndr32, ndr64]
ctx1b = dcerpc.ctx_list()
ctx1b.context_id = 1
ctx1b.num_transfer_syntaxes = len(tsf1b_list)
ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
ctx1.transfer_syntaxes = tsf1_list
- tsf1b_list = [ndr32,ndr64]
+ tsf1b_list = [ndr32, ndr64]
ctx1b = dcerpc.ctx_list()
ctx1b.context_id = 1
ctx1b.num_transfer_syntaxes = len(tsf1b_list)
ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
ctx1.transfer_syntaxes = tsf1_list
- tsf1b_list = [ndr32,ndr64]
+ tsf1b_list = [ndr32, ndr64]
ctx1b = dcerpc.ctx_list()
ctx1b.context_id = 1
ctx1b.num_transfer_syntaxes = len(tsf1b_list)
time.sleep(0.5)
self.connect()
- ack2 = self.do_generic_bind(ctx=ctx,assoc_group_id=ack.u.assoc_group_id,
+ ack2 = self.do_generic_bind(ctx=ctx, assoc_group_id=ack.u.assoc_group_id,
nak_reason=dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED)
return
floor5.lhs = lhs5
floor5.rhs.ipaddr = "0.0.0.0"
- floors = [floor1,floor2,floor3,floor4,floor5]
+ floors = [floor1, floor2, floor3, floor4, floor5]
req_tower = samba.dcerpc.epmapper.epm_tower()
req_tower.num_floors = len(floors)
req_tower.floors = floors
try:
rep_pdu = self.recv_raw(hexdump=hexdump, timeout=timeout)
if rep_pdu is None:
- return (None,None)
+ return (None, None)
rep = ndr_unpack(samba.dcerpc.dcerpc.ncacn_packet, rep_pdu, allow_remaining=True)
if ndr_print:
sys.stderr.write("recv_pdu: %s" % samba.ndr.ndr_print(rep))
self.assertEquals(2, self.conn.AddOne(1))
def test_echodata(self):
- self.assertEquals([1,2,3], self.conn.EchoData([1, 2, 3]))
+ self.assertEquals([1, 2, 3], self.conn.EchoData([1, 2, 3]))
def test_call(self):
self.assertEquals(u"foobar", self.conn.TestCall(u"foobar"))
def test_surrounding(self):
surrounding_struct = echo.Surrounding()
surrounding_struct.x = 4
- surrounding_struct.surrounding = [1,2,3,4]
+ surrounding_struct.surrounding = [1, 2, 3, 4]
y = self.conn.TestSurrounding(surrounding_struct)
self.assertEquals(8 * [0], y.surrounding)
'panic action', 'homedir map', 'NIS homedir',
'server string', 'netbios name', 'socket options', 'use mmap',
'ctdbd socket', 'printing', 'printcap name', 'queueresume command',
- 'queuepause command','lpresume command', 'lppause command',
+ 'queuepause command', 'lpresume command', 'lppause command',
'lprm command', 'lpq command', 'print command', 'template homedir',
'max open files',
'include system krb5 conf', 'rpc server dynamic port range',
'bytes': '10',
'octal': '0123',
'ustring': 'ustring',
- 'enum':'', 'boolean-auto': '', 'char': 'a', 'list': 'a, b, c'}
+ 'enum': '', 'boolean-auto': '', 'char': 'a', 'list': 'a, b, c'}
opposite_arbitrary = {'string': 'string2', 'boolean': 'no', 'integer': '6',
'boolean-rev': 'no',
'cmdlist': 'd e f',
'bytes': '11',
'octal': '0567',
'ustring': 'ustring2',
- 'enum':'', 'boolean-auto': '', 'char': 'b', 'list': 'd, e, f'}
+ 'enum': '', 'boolean-auto': '', 'char': 'b', 'list': 'd, e, f'}
failset = set()
count = 0
backenddb.transaction_start()
- backenddb.add({"dn":"@DSDB_LOCK_TEST"})
+ backenddb.add({"dn": "@DSDB_LOCK_TEST"})
backenddb.delete("@DSDB_LOCK_TEST")
# Obtain a write lock
os.write(w1, b"started")
self.assertEqual(os.read(r2, 3), b"add")
- backenddb.add({"dn":"@DSDB_LOCK_TEST"})
+ backenddb.add({"dn": "@DSDB_LOCK_TEST"})
backenddb.delete("@DSDB_LOCK_TEST")
os.write(w1, b"added")
adminDescription: """ + attr_name + """
adminDisplayName: """ + attr_name + """
cn: """ + attr_name + """
-attributeId: 1.3.6.1.4.1.7165.4.6.1.8.%d.""" % sub_oid + str(random.randint(1,100000)) + """
+attributeId: 1.3.6.1.4.1.7165.4.6.1.8.%d.""" % sub_oid + str(random.randint(1, 100000)) + """
attributeSyntax: 2.5.5.12
omSyntax: 64
instanceType: 4
def test_iter(self):
self.assertEquals([], list(self._get_shares({})))
- self.assertEquals([], list(self._get_shares({"global":{}})))
+ self.assertEquals([], list(self._get_shares({"global": {}})))
self.assertEquals(
["bla"],
- list(self._get_shares({"global":{}, "bla":{}})))
+ list(self._get_shares({"global": {}, "bla": {}})))
def test_len(self):
shares = self._get_shares({"global": {}})
#
up = ndr_unpack(drsblobs.package_PrimaryUserPasswordBlob,
binascii.a2b_hex(up_package.data))
- self.checkUserPassword(up, [("{CRYPT}", "6",10000)])
+ self.checkUserPassword(up, [("{CRYPT}", "6", 10000)])
self.checkNtHash(USER_PASS, up.current_nt_hash.hash)
#
up = ndr_unpack(drsblobs.package_PrimaryUserPasswordBlob,
binascii.a2b_hex(up_package.data))
- self.checkUserPassword(up, [("{CRYPT}", "6",None)])
+ self.checkUserPassword(up, [("{CRYPT}", "6", None)])
self.checkNtHash(USER_PASS, up.current_nt_hash.hash)
def test_supplementalCredentials_cleartext(self):
#
up = ndr_unpack(drsblobs.package_PrimaryUserPasswordBlob,
binascii.a2b_hex(up_package.data))
- self.checkUserPassword(up, [("{CRYPT}", "5",100)])
+ self.checkUserPassword(up, [("{CRYPT}", "5", 100)])
self.checkNtHash(USER_PASS, up.current_nt_hash.hash)
session_info=self.get_session_info())
facl = getntacl(self.lp, self.tempf, direct_db_access=True)
anysid = security.dom_sid(security.SID_NT_SELF)
- self.assertEquals(facl.as_sddl(anysid),acl)
+ self.assertEquals(facl.as_sddl(anysid), acl)
def test_setntacl_smbd_setposixacl_getntacl(self):
acl = ACL
session_info=self.get_session_info())
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
anysid = security.dom_sid(security.SID_NT_SELF)
- self.assertEquals(facl.as_sddl(anysid),acl)
+ self.assertEquals(facl.as_sddl(anysid), acl)
def test_setntacl_smbd_getntacl_smbd(self):
acl = ACL
session_info=self.get_session_info())
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
anysid = security.dom_sid(security.SID_NT_SELF)
- self.assertEquals(facl.as_sddl(anysid),acl)
+ self.assertEquals(facl.as_sddl(anysid), acl)
def test_setntacl_smbd_setposixacl_getntacl_smbd(self):
acl = ACL
session_info=self.get_session_info())
# This invalidates the hash of the NT acl just set because there is a hook in the posix ACL set code
s4_passdb = passdb.PDB(self.lp.get("passdb backend"))
- (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid)
+ (BA_gid, BA_type) = s4_passdb.sid_to_id(BA_sid)
smbd.set_simple_acl(self.tempf, 0o640, BA_gid)
# This should re-calculate an ACL based on the posix details
- facl = getntacl(self.lp,self.tempf, direct_db_access=False)
+ facl = getntacl(self.lp, self.tempf, direct_db_access=False)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(simple_acl_from_posix, facl.as_sddl(anysid))
session_info=self.get_session_info())
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
domsid = security.dom_sid(DOM_SID)
- self.assertEquals(facl.as_sddl(domsid),acl)
+ self.assertEquals(facl.as_sddl(domsid), acl)
def test_setntacl_getposixacl(self):
acl = ACL
session_info=self.get_session_info())
facl = getntacl(self.lp, self.tempf)
anysid = security.dom_sid(security.SID_NT_SELF)
- self.assertEquals(facl.as_sddl(anysid),acl)
+ self.assertEquals(facl.as_sddl(anysid), acl)
posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS)
def test_setposixacl_getntacl(self):
user_SID = s4_passdb.uid_to_sid(os.stat(self.tempdir).st_uid)
BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
s4_passdb = passdb.PDB(self.lp.get("passdb backend"))
- (BA_id,BA_type) = s4_passdb.sid_to_id(BA_sid)
+ (BA_id, BA_type) = s4_passdb.sid_to_id(BA_sid)
self.assertEquals(BA_type, idmap.ID_TYPE_BOTH)
SO_sid = security.dom_sid(security.SID_BUILTIN_SERVER_OPERATORS)
- (SO_id,SO_type) = s4_passdb.sid_to_id(SO_sid)
+ (SO_id, SO_type) = s4_passdb.sid_to_id(SO_sid)
self.assertEquals(SO_type, idmap.ID_TYPE_BOTH)
smbd.chown(self.tempdir, BA_id, SO_id)
smbd.set_simple_acl(self.tempdir, 0o750)
def test_setposixacl_group_getntacl_smbd(self):
BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
s4_passdb = passdb.PDB(self.lp.get("passdb backend"))
- (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid)
+ (BA_gid, BA_type) = s4_passdb.sid_to_id(BA_sid)
group_SID = s4_passdb.gid_to_sid(os.stat(self.tempf).st_gid)
user_SID = s4_passdb.uid_to_sid(os.stat(self.tempf).st_uid)
self.assertEquals(BA_type, idmap.ID_TYPE_BOTH)
def test_setposixacl_group_getposixacl(self):
BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
s4_passdb = passdb.PDB(self.lp.get("passdb backend"))
- (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid)
+ (BA_gid, BA_type) = s4_passdb.sid_to_id(BA_sid)
self.assertEquals(BA_type, idmap.ID_TYPE_BOTH)
smbd.set_simple_acl(self.tempf, 0o670, BA_gid)
posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS)
setntacl(self.lp, self.tempf, acl, str(domsid), use_ntvfs=False,
session_info=session_info)
facl = getntacl(self.lp, self.tempf)
- self.assertEquals(facl.as_sddl(domsid),acl)
+ self.assertEquals(facl.as_sddl(domsid), acl)
posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS)
nwrap_module_so_path = os.getenv('NSS_WRAPPER_MODULE_SO_PATH')
# These assertions correct for current ad_dc selftest
# configuration. When other environments have a broad range of
# groups mapped via passdb, we can relax some of these checks
- (LA_uid,LA_type) = s4_passdb.sid_to_id(LA_sid)
+ (LA_uid, LA_type) = s4_passdb.sid_to_id(LA_sid)
self.assertEquals(LA_type, idmap.ID_TYPE_UID)
- (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid)
+ (BA_gid, BA_type) = s4_passdb.sid_to_id(BA_sid)
self.assertEquals(BA_type, idmap.ID_TYPE_BOTH)
- (SO_gid,SO_type) = s4_passdb.sid_to_id(SO_sid)
+ (SO_gid, SO_type) = s4_passdb.sid_to_id(SO_sid)
self.assertEquals(SO_type, idmap.ID_TYPE_BOTH)
- (SY_gid,SY_type) = s4_passdb.sid_to_id(SY_sid)
+ (SY_gid, SY_type) = s4_passdb.sid_to_id(SY_sid)
self.assertEquals(SO_type, idmap.ID_TYPE_BOTH)
- (AU_gid,AU_type) = s4_passdb.sid_to_id(AU_sid)
+ (AU_gid, AU_type) = s4_passdb.sid_to_id(AU_sid)
self.assertEquals(AU_type, idmap.ID_TYPE_BOTH)
self.assertEquals(posix_acl.count, 13, self.print_posix_acl(posix_acl))
setntacl(self.lp, self.tempdir, acl, str(domsid), use_ntvfs=False,
session_info=session_info)
facl = getntacl(self.lp, self.tempdir)
- self.assertEquals(facl.as_sddl(domsid),acl)
+ self.assertEquals(facl.as_sddl(domsid), acl)
posix_acl = smbd.get_sys_acl(self.tempdir, smb_acl.SMB_ACL_TYPE_ACCESS)
LA_sid = security.dom_sid(str(domsid) + "-" + str(security.DOMAIN_RID_ADMINISTRATOR))
# These assertions correct for current ad_dc selftest
# configuration. When other environments have a broad range of
# groups mapped via passdb, we can relax some of these checks
- (LA_uid,LA_type) = s4_passdb.sid_to_id(LA_sid)
+ (LA_uid, LA_type) = s4_passdb.sid_to_id(LA_sid)
self.assertEquals(LA_type, idmap.ID_TYPE_UID)
- (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid)
+ (BA_gid, BA_type) = s4_passdb.sid_to_id(BA_sid)
self.assertEquals(BA_type, idmap.ID_TYPE_BOTH)
- (SO_gid,SO_type) = s4_passdb.sid_to_id(SO_sid)
+ (SO_gid, SO_type) = s4_passdb.sid_to_id(SO_sid)
self.assertEquals(SO_type, idmap.ID_TYPE_BOTH)
- (SY_gid,SY_type) = s4_passdb.sid_to_id(SY_sid)
+ (SY_gid, SY_type) = s4_passdb.sid_to_id(SY_sid)
self.assertEquals(SO_type, idmap.ID_TYPE_BOTH)
- (AU_gid,AU_type) = s4_passdb.sid_to_id(AU_sid)
+ (AU_gid, AU_type) = s4_passdb.sid_to_id(AU_sid)
self.assertEquals(AU_type, idmap.ID_TYPE_BOTH)
self.assertEquals(posix_acl.count, 13, self.print_posix_acl(posix_acl))
setntacl(self.lp, self.tempdir, acl, str(domsid), use_ntvfs=False,
session_info=session_info)
facl = getntacl(self.lp, self.tempdir)
- self.assertEquals(facl.as_sddl(domsid),acl)
+ self.assertEquals(facl.as_sddl(domsid), acl)
posix_acl = smbd.get_sys_acl(self.tempdir, smb_acl.SMB_ACL_TYPE_ACCESS)
LA_sid = security.dom_sid(str(domsid) + "-" + str(security.DOMAIN_RID_ADMINISTRATOR))
# These assertions correct for current ad_dc selftest
# configuration. When other environments have a broad range of
# groups mapped via passdb, we can relax some of these checks
- (LA_uid,LA_type) = s4_passdb.sid_to_id(LA_sid)
+ (LA_uid, LA_type) = s4_passdb.sid_to_id(LA_sid)
self.assertEquals(LA_type, idmap.ID_TYPE_UID)
- (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid)
+ (BA_gid, BA_type) = s4_passdb.sid_to_id(BA_sid)
self.assertEquals(BA_type, idmap.ID_TYPE_BOTH)
- (SO_gid,SO_type) = s4_passdb.sid_to_id(SO_sid)
+ (SO_gid, SO_type) = s4_passdb.sid_to_id(SO_sid)
self.assertEquals(SO_type, idmap.ID_TYPE_BOTH)
- (SY_gid,SY_type) = s4_passdb.sid_to_id(SY_sid)
+ (SY_gid, SY_type) = s4_passdb.sid_to_id(SY_sid)
self.assertEquals(SO_type, idmap.ID_TYPE_BOTH)
- (AU_gid,AU_type) = s4_passdb.sid_to_id(AU_sid)
+ (AU_gid, AU_type) = s4_passdb.sid_to_id(AU_sid)
self.assertEquals(AU_type, idmap.ID_TYPE_BOTH)
- (PA_gid,PA_type) = s4_passdb.sid_to_id(PA_sid)
+ (PA_gid, PA_type) = s4_passdb.sid_to_id(PA_sid)
self.assertEquals(PA_type, idmap.ID_TYPE_BOTH)
self.assertEquals(posix_acl.count, 15, self.print_posix_acl(posix_acl))
setntacl(self.lp, self.tempf, acl, str(domsid), use_ntvfs=False,
session_info=session_info)
facl = getntacl(self.lp, self.tempf)
- self.assertEquals(facl.as_sddl(domsid),acl)
+ self.assertEquals(facl.as_sddl(domsid), acl)
posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS)
nwrap_module_so_path = os.getenv('NSS_WRAPPER_MODULE_SO_PATH')
# These assertions correct for current ad_dc selftest
# configuration. When other environments have a broad range of
# groups mapped via passdb, we can relax some of these checks
- (LA_uid,LA_type) = s4_passdb.sid_to_id(LA_sid)
+ (LA_uid, LA_type) = s4_passdb.sid_to_id(LA_sid)
self.assertEquals(LA_type, idmap.ID_TYPE_UID)
- (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid)
+ (BA_gid, BA_type) = s4_passdb.sid_to_id(BA_sid)
self.assertEquals(BA_type, idmap.ID_TYPE_BOTH)
- (SO_gid,SO_type) = s4_passdb.sid_to_id(SO_sid)
+ (SO_gid, SO_type) = s4_passdb.sid_to_id(SO_sid)
self.assertEquals(SO_type, idmap.ID_TYPE_BOTH)
- (SY_gid,SY_type) = s4_passdb.sid_to_id(SY_sid)
+ (SY_gid, SY_type) = s4_passdb.sid_to_id(SY_sid)
self.assertEquals(SO_type, idmap.ID_TYPE_BOTH)
- (AU_gid,AU_type) = s4_passdb.sid_to_id(AU_sid)
+ (AU_gid, AU_type) = s4_passdb.sid_to_id(AU_sid)
self.assertEquals(AU_type, idmap.ID_TYPE_BOTH)
- (PA_gid,PA_type) = s4_passdb.sid_to_id(PA_sid)
+ (PA_gid, PA_type) = s4_passdb.sid_to_id(PA_sid)
self.assertEquals(PA_type, idmap.ID_TYPE_BOTH)
self.assertEquals(posix_acl.count, 15, self.print_posix_acl(posix_acl))
#
msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
scope=SCOPE_BASE,
- attrs=['foo','blah','cn','showInAdvancedViewOnly'])
+ attrs=['foo', 'blah', 'cn', 'showInAdvancedViewOnly'])
self.assertEquals(len(msg), 1)
self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
self.assertEquals(str(msg[0]["foo"]), "bar")
# Checking for existence of record (remote)
msg = self.ldb.search(expression="(unixName=bin)",
- attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
+ attrs=['unixName', 'cn', 'dn', 'sambaUnicodePwd'])
self.assertEquals(len(msg), 1)
self.assertEquals(str(msg[0]["cn"]), "Niemand")
self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
# Checking for existence of record (local && remote)
msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
- attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
+ attrs=['unixName', 'cn', 'dn', 'sambaUnicodePwd'])
self.assertEquals(len(msg), 1) # TODO: should check with more records
self.assertEquals(str(msg[0]["cn"]), "Niemand")
self.assertEquals(str(msg[0]["unixName"]), "bin")
# Checking for existence of record (local || remote)
msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
- attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
+ attrs=['unixName', 'cn', 'dn', 'sambaUnicodePwd'])
#print "got %d replies" % len(msg)
self.assertEquals(len(msg), 1) # TODO: should check with more records
self.assertEquals(str(msg[0]["cn"]), "Niemand")
self.assertEquals(str(res[4]["lastLogon"]), "z")
# Clean up
- dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
+ dns = [self.samba4.dn("cn=%s" % n) for n in ["A", "B", "C", "X", "Y", "Z"]]
for dn in dns:
self.ldb.delete(dn)
"::1",
"::",
"1:1:1:1:1:1:1:1"],
- "PTR":good_dns,
- "CNAME":good_dns,
- "NS":good_dns,
- "MX":good_mx,
- "SRV":good_srv,
- "TXT":["text", "", "@#!", "\n"]
+ "PTR": good_dns,
+ "CNAME": good_dns,
+ "NS": good_dns,
+ "MX": good_mx,
+ "SRV": good_srv,
+ "TXT": ["text", "", "@#!", "\n"]
}
self.bad_records = {
"1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
"1234:5678:9ABC:DEF0:1234:5678:9ABC",
"1111::1111::1111"],
- "PTR":bad_dns,
- "CNAME":bad_dns,
- "NS":bad_dns,
- "MX":bad_mx,
- "SRV":bad_srv
+ "PTR": bad_dns,
+ "CNAME": bad_dns,
+ "NS": bad_dns,
+ "MX": bad_mx,
+ "SRV": bad_srv
}
def tearDown(self):
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("dsheuristics: <NO VALUE>", out)
def test_modify_dsheuristics(self):
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("set dsheuristics: 0000002", out)
(result, out, err) = self.runsubcmd("fsmo", "show")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
# Check that the output is sensible
samdb = self.getSamDB("-H", "ldap://%s" % os.environ["SERVER"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","There shouldn't be any error message")
+ self.assertEquals(err, "", "There shouldn't be any error message")
self.assertIn("Added group %s" % group["name"], out)
found = self._find_group(group["name"])
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("dn: CN=Domain Users,CN=Users,DC=samba,DC=example,DC=com", out)
def _randomGroup(self, base={}):
(result, out, err) = self.runsubcmd("domain", "join", os.environ["REALM"], "dc", "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]))
self.assertCmdFail(result)
- self.assertTrue("Not removing account" in err,"Should fail with exception")
+ self.assertTrue("Not removing account" in err, "Should fail with exception")
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-ntvfs")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(out,"","Shouldn't be any output messages")
+ self.assertEquals(out, "", "Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
def test_s3fs(self):
"--use-s3fs")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
- self.assertEquals(out,"","Shouldn't be any output messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
+ self.assertEquals(out, "", "Shouldn't be any output messages")
def test_ntvfs_check(self):
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-ntvfs")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(out,"","Shouldn't be any output messages")
+ self.assertEquals(out, "", "Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
# Now check they were set correctly
(result, out, err) = self.runsubcmd("ntacl", "sysvolcheck")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
- self.assertEquals(out,"","Shouldn't be any output messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
+ self.assertEquals(out, "", "Shouldn't be any output messages")
def test_s3fs_check(self):
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-s3fs")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
- self.assertEquals(out,"","Shouldn't be any output messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
+ self.assertEquals(out, "", "Shouldn't be any output messages")
# Now check they were set correctly
(result, out, err) = self.runsubcmd("ntacl", "sysvolcheck")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
- self.assertEquals(out,"","Shouldn't be any output messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
+ self.assertEquals(out, "", "Shouldn't be any output messages")
class NtACLCmdGetSetTestCase(SambaToolCmdTest):
"""Tests for samba-tool ntacl get/set subcommands"""
def test_ntvfs(self):
path = os.environ['SELFTEST_PREFIX']
- tempf = os.path.join(path,"pytests" +str(int(100000 * random.random())))
+ tempf = os.path.join(path, "pytests" + str(int(100000 * random.random())))
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-ntvfs")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(out,"","Shouldn't be any output messages")
+ self.assertEquals(out, "", "Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
def test_s3fs(self):
path = os.environ['SELFTEST_PREFIX']
- tempf = os.path.join(path,"pytests" +str(int(100000 * random.random())))
+ tempf = os.path.join(path, "pytests" + str(int(100000 * random.random())))
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-s3fs")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
- self.assertEquals(out,"","Shouldn't be any output messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
+ self.assertEquals(out, "", "Shouldn't be any output messages")
def test_ntvfs_check(self):
path = os.environ['SELFTEST_PREFIX']
- tempf = os.path.join(path,"pytests" +str(int(100000 * random.random())))
+ tempf = os.path.join(path, "pytests" + str(int(100000 * random.random())))
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-ntvfs")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(out,"","Shouldn't be any output messages")
+ self.assertEquals(out, "", "Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
# Now check they were set correctly
(result, out, err) = self.runsubcmd("ntacl", "get", tempf,
"--use-ntvfs", "--as-sddl")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertEquals(self.acl + "\n", out, "Output should be the ACL")
def test_s3fs_check(self):
path = os.environ['SELFTEST_PREFIX']
- tempf = os.path.join(path,"pytests" + str(int(100000 *random.random())))
+ tempf = os.path.join(path, "pytests" + str(int(100000 * random.random())))
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-s3fs")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(out,"","Shouldn't be any output messages")
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(out, "", "Shouldn't be any output messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
# Now check they were set correctly
(result, out, err) = self.runsubcmd("ntacl", "get", tempf,
"--use-s3fs", "--as-sddl")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
- self.assertEquals(self.acl + "\n", out,"Output should be the ACL")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
+ self.assertEquals(self.acl + "\n", out, "Output should be the ACL")
self.creds.guess(self.lp)
self.session = system_session()
self.ldb = SamDB("ldap://" + os.environ["DC_SERVER"],
- session_info=self.session, credentials=self.creds,lp=self.lp)
+ session_info=self.session, credentials=self.creds, lp=self.lp)
self.base_dn = self.ldb.domain_dn()
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("dn: CN=uid,CN=Schema,CN=Configuration,DC=samba,DC=example,DC=com", out)
def test_modify_attribute_searchflags(self):
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("modified cn=uid,CN=Schema,CN=Configuration,DC=samba,DC=example,DC=com", out)
(result, out, err) = self.runsublevelcmd("schema", ("attribute",
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("modified cn=uid,CN=Schema,CN=Configuration,DC=samba,DC=example,DC=com", out)
(result, out, err) = self.runsublevelcmd("schema", ("attribute",
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("modified cn=uid,CN=Schema,CN=Configuration,DC=samba,DC=example,DC=com", out)
def test_show_oc_attribute(self):
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("--- MAY contain ---", out)
self.assertIn("--- MUST contain ---", out)
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("dn: CN=Person,CN=Schema,CN=Configuration,DC=samba,DC=example,DC=com", out)
(result, out, err) = user["createUserFn"](user)
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
user["checkUserFn"](user)
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
found = self._find_user(user["name"])
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
- self.assertEquals(err,"","setpassword with url")
+ self.assertEquals(err, "", "setpassword with url")
self.assertMatch(out, "Changed password OK", "setpassword with url")
attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
"--attributes=%s" % attributes,
"--decrypt-samba-gpg")
self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
- self.assertEqual(err,"","getpassword without url")
+ self.assertEqual(err, "", "getpassword without url")
cache_attrs = {
"objectClass": {"value": "userSyncPasswords"},
"samdbUrl": {},
(result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
- self.assertEqual(err,"","syncpasswords --no-wait")
+ self.assertEqual(err, "", "syncpasswords --no-wait")
self.assertMatch(out, "dirsync_loop(): results 0",
"syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
for user in self.users:
user["name"],
"--newpassword=%s" % newpasswd)
self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
- self.assertEquals(err,"","setpassword without url")
+ self.assertEquals(err, "", "setpassword without url")
self.assertMatch(out, "Changed password OK", "setpassword without url")
(result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
- self.assertEqual(err,"","syncpasswords --no-wait")
+ self.assertEqual(err, "", "syncpasswords --no-wait")
self.assertMatch(out, "dirsync_loop(): results 0",
"syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
"--attributes=%s" % attributes,
"--decrypt-samba-gpg")
self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
- self.assertEqual(err,"","getpassword without url")
+ self.assertEqual(err, "", "getpassword without url")
self.assertMatch(out, "Got password OK", "getpassword without url")
self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
"getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
- self.assertEquals(err,"","setpassword with forced change")
+ self.assertEquals(err, "", "setpassword with forced change")
self.assertMatch(out, "Changed password OK", "setpassword with forced change")
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
self._check_posix_user(user)
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
self._check_posix_user(user)
self.assertEquals(desc.type, 0x8004)
def test_from_sddl_invalidsddl(self):
- self.assertRaises(TypeError,security.descriptor.from_sddl, "foo",security.dom_sid("S-2-0-0"))
+ self.assertRaises(TypeError, security.descriptor.from_sddl, "foo", security.dom_sid("S-2-0-0"))
def test_from_sddl_invalidtype1(self):
- self.assertRaises(TypeError, security.descriptor.from_sddl, security.dom_sid('S-2-0-0-512'),security.dom_sid("S-2-0-0"))
+ self.assertRaises(TypeError, security.descriptor.from_sddl, security.dom_sid('S-2-0-0-512'), security.dom_sid("S-2-0-0"))
def test_from_sddl_invalidtype2(self):
sddl = "O:AOG:DAD:(A;;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)"
text = "O:AOG:DAD:(A;;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)"
dom = security.dom_sid("S-2-0-0")
desc1 = security.descriptor.from_sddl(text, dom)
- self.assertRaises(TypeError, desc1.as_sddl,text)
+ self.assertRaises(TypeError, desc1.as_sddl, text)
def test_as_sddl_no_domainsid(self):
test_literal_bytes_embed_nulls = b'\xff\xfe\x14\x61\x00\x00\x62\x63\x64' * 256
binary_contents = b'\xff\xfe'
binary_contents = binary_contents + "Hello cruel world of python3".encode('utf8') * 128
-test_dir = os.path.join(addom, 'testing_%d' % random.randint(0,0xFFFF))
+test_dir = os.path.join(addom, 'testing_%d' % random.randint(0, 0xFFFF))
test_file = os.path.join(test_dir, 'testing').replace('/', '\\')
class SMBTests(samba.tests.TestCase):
ls = [f['name'] for f in self.conn.list(addom)]
self.assertIn('scripts', ls,
msg='"scripts" directory not found in sysvol')
- self.assertIn('Policies',ls,
+ self.assertIn('Policies', ls,
msg='"Policies" directory not found in sysvol')
def test_unlink(self):
from samba import param
from samba.credentials import Credentials
from samba.auth import system_session
-from samba.provision import getpolicypath,find_provision_key_parameters
+from samba.provision import getpolicypath, find_provision_key_parameters
from samba.upgradehelpers import (get_paths, get_ldbs,
identic_rename,
updateOEMInfo, getOEMInfo, update_gpo,
creds.guess(samba3.lp)
creds.set_bind_dn(ldapuser)
creds.set_password(ldappass)
- urls = samba3.lp.get("passdb backend").split(":",1)[1].strip('"')
+ urls = samba3.lp.get("passdb backend").split(":", 1)[1].strip('"')
for url in urls.split():
try:
ldb_object = Ldb(url, credentials=creds)
"objectCategory", "distinguishedName", "nTMixedDomain",
"showInAdvancedViewOnly", "instanceType", "msDS-Behavior-Version",
"nextRid", "cn", "versionNumber", "lmPwdHistory", "pwdLastSet",
- "ntPwdHistory", "unicodePwd","dBCSPwd", "supplementalCredentials",
- "gPCUserExtensionNames", "gPCMachineExtensionNames","maxPwdAge", "secret",
+ "ntPwdHistory", "unicodePwd", "dBCSPwd", "supplementalCredentials",
+ "gPCUserExtensionNames", "gPCMachineExtensionNames", "maxPwdAge", "secret",
"possibleInferiors", "privilege", "sAMAccountType"])
# policy guid
res = samdb.search(expression="(displayName=Default Domain Policy)",
base="CN=Policies,CN=System," + str(names.rootdn),
- scope=SCOPE_ONELEVEL, attrs=["cn","displayName"])
- names.policyid = str(res[0]["cn"]).replace("{","").replace("}","")
+ scope=SCOPE_ONELEVEL, attrs=["cn", "displayName"])
+ names.policyid = str(res[0]["cn"]).replace("{", "").replace("}", "")
# dc policy guid
res2 = samdb.search(expression="(displayName=Default Domain Controllers"
" Policy)",
base="CN=Policies,CN=System," + str(names.rootdn),
- scope=SCOPE_ONELEVEL, attrs=["cn","displayName"])
+ scope=SCOPE_ONELEVEL, attrs=["cn", "displayName"])
if len(res2) == 1:
- names.policyid_dc = str(res2[0]["cn"]).replace("{","").replace("}","")
+ names.policyid_dc = str(res2[0]["cn"]).replace("{", "").replace("}", "")
else:
names.policyid_dc = None
return ret
else:
if i == minimum - 1:
- assert len1 != len2,"PB PB PB" + " ".join(tab1) + " / " + " ".join(tab2)
+ assert len1 != len2, "PB PB PB" + " ".join(tab1) + " / " + " ".join(tab2)
if len1 > len2:
return 1
else:
have
"""
entry = samdb.search(expression='(objectClass=user)',
- base=ldb.Dn(samdb,str(rootdn)),
+ base=ldb.Dn(samdb, str(rootdn)),
scope=SCOPE_SUBTREE, attrs=["msDs-KeyVersionNumber"],
controls=["search_options:1:2"])
done = 0
if len(attrs) > 0:
expr = "(|"
for att in attrs:
- expr = "%s(%s=*)" %(expr,att)
+ expr = "%s(%s=*)" %(expr, att)
expr = "%s)" %expr
return expr
return hashAtt
entry = samdb.search(expression=expr, base=ldb.Dn(samdb, str(rootdn)),
scope=SCOPE_SUBTREE, attrs=attrs,
- controls=["search_options:1:2","bypassoperational:0"])
+ controls=["search_options:1:2", "bypassoperational:0"])
if len(entry) == 0:
# Nothing anymore
return hashAtt
print("To track the USNs modified/created by provision and upgrade proivsion,")
print(" the following ranges are proposed to be added to your provision sam.ldb: \n%s" % ldif)
print("We recommend to review them, and if it's correct to integrate the following ldif: %s in your sam.ldb" % file)
- print("You can load this file like this: ldbadd -H %s %s\n" %(str(samdb_path),file))
+ print("You can load this file like this: ldbadd -H %s %s\n" %(str(samdb_path), file))
ldif = "dn: @PROVISION\nprovisionnerID: %s\n%s" % (invocationid, ldif)
- open(file,'w').write(ldif)
+ open(file, 'w').write(ldif)
def int64range2str(value):
"""Display the int64 range stored in value as xxx-yyy
if name == "":
if have_swat:
start_response('301 Redirect',
- [('Location', urljoin(application_uri(environ), 'swat')),])
+ [('Location', urljoin(application_uri(environ), 'swat')), ])
return []
else:
return render_placeholder(environ, start_response)
# released under GNU GPL v3 or later
from __future__ import print_function
-from subprocess import call, check_call,Popen, PIPE
+from subprocess import call, check_call, Popen, PIPE
import os, tarfile, sys, time
from optparse import OptionParser
import smtplib
"samba-test-only": [("configure", "./configure.developer --with-selftest-prefix=./bin/ab --abi-check-disable" + samba_configure_params, "text/plain"),
("make", "make -j", "text/plain"),
- ("test", 'make test FAIL_IMMEDIATELY=1 TESTS="${TESTS}"',"text/plain")],
+ ("test", 'make test FAIL_IMMEDIATELY=1 TESTS="${TESTS}"', "text/plain")],
# Test cross-compile infrastructure
"samba-xc": [("random-sleep", "script/random-sleep.sh 60 600", "text/plain"),
parser.add_option("", "--bad", help="known bad revision (default HEAD)", default='HEAD')
parser.add_option("", "--skip-build-errors", help="skip revision where make fails",
action='store_true', default=False)
-parser.add_option("", "--autogen", help="run autogen before each build",action="store_true", default=False)
+parser.add_option("", "--autogen", help="run autogen before each build", action="store_true", default=False)
parser.add_option("", "--autogen-command", help="command to use for autogen (default ./autogen.sh)",
type='str', default="./autogen.sh")
parser.add_option("", "--configure", help="run configure.developer before each build",
lines.append(line)
f.close()
if base_fname:
- diff = list(difflib.unified_diff(base_lines,lines,base_fname,fname))
+ diff = list(difflib.unified_diff(base_lines, lines, base_fname, fname))
if diff:
print('configuration files %s and %s do not match' % (base_fname, fname))
for l in diff:
return exitcode
-class SubunitOps(TestProtocolClient,TestsuiteEnabledTestResult):
+class SubunitOps(TestProtocolClient, TestsuiteEnabledTestResult):
def progress(self, count, whence):
if whence == subunit.PROGRESS_POP:
#!/usr/bin/env python
from __future__ import print_function
-import sys,os,subprocess
+import sys, os, subprocess
if len(sys.argv) != 3:
#
# RENAME-ACCESS needs to run against a special share - acl_xattr_ign_sysacl_windows
#
-plantestsuite("samba3.smbtorture_s3.plain(nt4_dc).%s" % "RENAME-ACCESS","nt4_dc", [os.path.join(samba3srcdir, "script/tests/test_smbtorture_s3.sh"), "RENAME-ACCESS", '//$SERVER_IP/acl_xattr_ign_sysacl_windows', '$USERNAME', '$PASSWORD', smbtorture3, "", "-l $LOCAL_PATH"])
+plantestsuite("samba3.smbtorture_s3.plain(nt4_dc).%s" % "RENAME-ACCESS", "nt4_dc", [os.path.join(samba3srcdir, "script/tests/test_smbtorture_s3.sh"), "RENAME-ACCESS", '//$SERVER_IP/acl_xattr_ign_sysacl_windows', '$USERNAME', '$PASSWORD', smbtorture3, "", "-l $LOCAL_PATH"])
plantestsuite("samba3.smbtorture_s3.crypt_client(nt4_dc).%s" % "RENAME-ACCESS", "nt4_dc", [os.path.join(samba3srcdir, "script/tests/test_smbtorture_s3.sh"), "RENAME-ACCESS", '//$SERVER_IP/acl_xattr_ign_sysacl_windows', '$USERNAME', '$PASSWORD', smbtorture3, "-e", "-l $LOCAL_PATH"])
# non-crypt only
def uniq_list(alist):
"""return a unique list"""
set = {}
- return [set.setdefault(e,e) for e in alist if e not in set]
+ return [set.setdefault(e, e) for e in alist if e not in set]
lp_ctx = sambaopts.get_loadparm()
res = classinfo[oc]["subClassOf"]
for r in res:
list.append(r)
- list.extend(supclasses(classinfo,r))
+ list.extend(supclasses(classinfo, r))
classinfo[oc]["SUPCLASSES"] = list
return list
print("Returned incorrect list for objectclass %s" % oc)
print("search: %s" % poss1)
print("constructed: %s" % poss2)
- for i in range(0,min(len(poss1),len(poss2))):
+ for i in range(0, min(len(poss1), len(poss2))):
print("%30s %30s" % (poss1[i], poss2[i]))
print("]")
sys.exit(1)
if objectclass is None:
for oc in get_object_classes(db):
- test_class(db,classinfo,oc)
+ test_class(db, classinfo, oc)
else:
- test_class(db,classinfo,objectclass)
+ test_class(db, classinfo, objectclass)
print("Lists match OK")
#u3 is member of administrators group, should be able to read sd
res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
SCOPE_BASE, None, ["nTSecurityDescriptor"])
- self.assertEqual(len(res),1)
+ self.assertEqual(len(res), 1)
self.assertTrue("nTSecurityDescriptor" in res[0].keys())
class AclUndeleteTests(AclTests):
self.assertEquals(len(res), 1)
return res[0]
- def search_dn(self,dn):
+ def search_dn(self, dn):
print("SEARCH by DN %s" % dn)
res = self.ldb.search(expression="(objectClass=*)",
def del_attr_values(self, delObj):
print("Checking attributes for %s" % delObj["dn"])
- self.assertEquals(delObj["isDeleted"][0],"TRUE")
+ self.assertEquals(delObj["isDeleted"][0], "TRUE")
self.assertTrue(not("objectCategory" in delObj))
self.assertTrue(not("sAMAccountType" in delObj))
guid = None
for e in res:
if str(e["name"]) == "testou3":
- guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
+ guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
ctl = str(res.controls[0]).split(":")
ctl[1] = "1"
expression="(objectClass=organizationalUnit)",
controls=[control1])
self.assertEqual(len(res), 1)
- guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
+ guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
self.assertEqual(guid2, guid)
self.assertTrue(res[0].get("isDeleted"))
self.assertTrue(res[0].get("name") != None)
guid = None
for e in res:
if str(e["name"]) == "testou3":
- guid = str(ndr_unpack(misc.GUID,e.get("objectGUID")[0]))
+ guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
self.assertTrue(guid != None)
ctl = str(res.controls[0]).split(":")
expression="(objectClass=organizationalUnit)",
controls=[control1])
self.assertEqual(len(res), 1)
- guid2 = str(ndr_unpack(misc.GUID,res[0].get("objectGUID")[0]))
+ guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
self.assertEqual(guid2, guid)
self.assertEqual(str(res[0].dn), "")
adminDescription: """ + attr_name + """
adminDisplayName: """ + attr_name + """
cn: """ + attr_name + """
-attributeId: 1.3.6.1.4.1.7165.4.6.1.7.%d.""" % sub_oid + str(random.randint(1,100000)) + """
+attributeId: 1.3.6.1.4.1.7165.4.6.1.7.%d.""" % sub_oid + str(random.randint(1, 100000)) + """
attributeSyntax: 2.5.5.12
omSyntax: 64
instanceType: 4
adminDescription: """ + class_name + """
adminDisplayName: """ + class_name + """
cn: """ + class_name + """
-governsId: 1.3.6.1.4.1.7165.4.6.2.7.%d.""" % sub_oid + str(random.randint(1,100000)) + """
+governsId: 1.3.6.1.4.1.7165.4.6.2.7.%d.""" % sub_oid + str(random.randint(1, 100000)) + """
instanceType: 4
objectClassCategory: 1
subClassOf: organizationalPerson
m = Message()
m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
- m["sAMAccountName"] = MessageElement(["nam1","nam2"], FLAG_MOD_REPLACE,
+ m["sAMAccountName"] = MessageElement(["nam1", "nam2"], FLAG_MOD_REPLACE,
"sAMAccountName")
try:
ldb.modify(m)
rdn = "CN=a012345678901234567890123456789012345678901234567890123456789012"
delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
ldif = """
-dn: %s,%s""" % (rdn,self.base_dn) + """
+dn: %s,%s""" % (rdn, self.base_dn) + """
objectClass: container
"""
self.ldb.add_ldif(ldif)
delete_force(self.ldb, "%s,%s" % (rdn, self.base_dn))
try:
ldif = """
-dn: %s,%s""" % (rdn,self.base_dn) + """
+dn: %s,%s""" % (rdn, self.base_dn) + """
objectClass: container
"""
self.ldb.add_ldif(ldif)
"""Test parentGUID behaviour"""
self.ldb.add({
"dn": "cn=parentguidtest,cn=users," + self.base_dn,
- "objectclass":"user",
- "samaccountname":"parentguidtest"})
+ "objectclass": "user",
+ "samaccountname": "parentguidtest"})
res1 = ldb.search(base="cn=parentguidtest,cn=users," + self.base_dn, scope=SCOPE_BASE,
attrs=["parentGUID", "samaccountname"])
- res2 = ldb.search(base="cn=users," + self.base_dn,scope=SCOPE_BASE,
+ res2 = ldb.search(base="cn=users," + self.base_dn, scope=SCOPE_BASE,
attrs=["objectGUID"])
res3 = ldb.search(base=self.base_dn, scope=SCOPE_BASE,
attrs=["parentGUID"])
self.ldb.add({
"dn": "cn=testotherusers," + self.base_dn,
- "objectclass":"container"})
- res1 = ldb.search(base="cn=testotherusers," + self.base_dn,scope=SCOPE_BASE,
+ "objectclass": "container"})
+ res1 = ldb.search(base="cn=testotherusers," + self.base_dn, scope=SCOPE_BASE,
attrs=["objectGUID"])
ldb.rename("cn=parentguidtest,cn=users," + self.base_dn,
"cn=parentguidtest,cn=testotherusers," + self.base_dn)
# there does not seem to be another limitation.
try:
dshstr = ""
- for i in range(1,11):
+ for i in range(1, 11):
# This is in the range
self.ldb.set_dsheuristics(dshstr + "x")
self.ldb.set_dsheuristics(dshstr + "xxxxx")
def test_schemaUpdateNow(self):
"""Testing schemaUpdateNow"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
# Search for created attribute
res = []
res = self.ldb.search("cn=%s,%s" % (attr_name, self.schema_dn), scope=SCOPE_BASE,
- attrs=["lDAPDisplayName","schemaIDGUID", "msDS-IntID"])
+ attrs=["lDAPDisplayName", "schemaIDGUID", "msDS-IntID"])
self.assertEquals(len(res), 1)
self.assertEquals(res[0]["lDAPDisplayName"][0], attr_ldap_display_name)
self.assertTrue("schemaIDGUID" in res[0])
adminDescription: """ + class_name + """
adminDisplayName: """ + class_name + """
cn: """ + class_name + """
-governsId: 1.3.6.1.4.1.7165.4.6.2.6.1.""" + str(random.randint(1,100000)) + """
+governsId: 1.3.6.1.4.1.7165.4.6.2.6.1.""" + str(random.randint(1, 100000)) + """
instanceType: 4
objectClassCategory: 1
subClassOf: organizationalPerson
adminDescription: """ + class_name + """
adminDisplayName: """ + class_name + """
cn: """ + class_name + """
-governsId: 1.3.6.1.4.1.7165.4.6.2.6.2.""" + str(random.randint(1,100000)) + """
+governsId: 1.3.6.1.4.1.7165.4.6.2.6.2.""" + str(random.randint(1, 100000)) + """
instanceType: 4
objectClassCategory: 1
subClassOf: organizationalPerson
adminDescription: """ + class_name + """
adminDisplayName: """ + class_name + """
cn: """ + class_name + """
-governsId: 1.3.6.1.4.1.7165.4.6.2.6.3.""" + str(random.randint(1,100000)) + """
+governsId: 1.3.6.1.4.1.7165.4.6.2.6.3.""" + str(random.randint(1, 100000)) + """
instanceType: 4
objectClassCategory: 1
subClassOf: organizationalUnit
def test_duplicate_attributeID(self):
"""Testing creating a duplicate attribute"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.2." + rand
def test_duplicate_attributeID_governsID(self):
"""Testing creating a duplicate attribute and class"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.3." + rand
def test_duplicate_cn(self):
"""Testing creating a duplicate attribute"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.4." + rand
def test_duplicate_implicit_ldapdisplayname(self):
"""Testing creating a duplicate attribute ldapdisplayname"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.5." + rand
def test_duplicate_explicit_ldapdisplayname(self):
"""Testing creating a duplicate attribute ldapdisplayname"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.6." + rand
def test_duplicate_explicit_ldapdisplayname_with_class(self):
"""Testing creating a duplicate attribute ldapdisplayname between
and attribute and a class"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.7." + rand
def test_duplicate_via_rename_ldapdisplayname(self):
"""Testing creating a duplicate attribute ldapdisplayname"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.8." + rand
def test_duplicate_via_rename_attributeID(self):
"""Testing creating a duplicate attributeID"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.9." + rand
def test_remove_ldapdisplayname(self):
"""Testing removing the ldapdisplayname"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.10." + rand
def test_rename_ldapdisplayname(self):
"""Testing renaming ldapdisplayname"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.11." + rand
def test_change_attributeID(self):
"""Testing change the attributeID"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.12." + rand
def test_change_attributeID_same(self):
"""Testing change the attributeID to the same value"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name = "test-Attr" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name = attr_name.replace("-", "")
attributeID = "1.3.6.1.4.1.7165.4.6.1.6.13." + rand
if dc_level < DS_DOMAIN_FUNCTION_2003:
return
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name_1 = "test-generated-linkID" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name_1 = attr_name_1.replace("-", "")
duplicate mAPIIDs.
"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
attr_name_1 = "test-generated-mAPIID" + time.strftime("%s", time.gmtime()) + "-" + rand
attr_ldap_display_name_1 = attr_name_1.replace("-", "")
def test_change_governsID(self):
"""Testing change the governsID"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
class_name = "test-Class" + time.strftime("%s", time.gmtime()) + "-" + rand
class_ldap_display_name = class_name.replace("-", "")
governsID = "1.3.6.1.4.1.7165.4.6.2.6.5." + rand
def test_change_governsID_same(self):
"""Testing change the governsID"""
- rand = str(random.randint(1,100000))
+ rand = str(random.randint(1, 100000))
class_name = "test-Class" + time.strftime("%s", time.gmtime()) + "-" + rand
class_ldap_display_name = class_name.replace("-", "")
governsID = "1.3.6.1.4.1.7165.4.6.2.6.6." + rand
adminDescription: """ + class_name + """
adminDisplayName: """ + class_name + """
cn: """ + class_name + """
-governsId: 1.3.6.1.4.1.7165.4.6.2.6.7.""" + str(random.randint(1,100000)) + """
+governsId: 1.3.6.1.4.1.7165.4.6.2.6.7.""" + str(random.randint(1, 100000)) + """
instanceType: 4
objectClassCategory: 1
subClassOf: organizationalUnit
adminDescription: """ + attr_name + """
adminDisplayName: """ + attr_name + """
cn: """ + attr_name + """
-attributeId: 1.3.6.1.4.1.7165.4.6.1.6.14.""" + str(random.randint(1,100000)) + """
+attributeId: 1.3.6.1.4.1.7165.4.6.1.6.14.""" + str(random.randint(1, 100000)) + """
attributeSyntax: 2.5.5.12
omSyntax: 64
instanceType: 4
adminDescription: """ + class_name + """
adminDisplayName: """ + class_name + """
cn: """ + class_name + """
-governsId: 1.3.6.1.4.1.7165.4.6.2.6.%d.""" % sub_oid + str(random.randint(1,100000)) + """
+governsId: 1.3.6.1.4.1.7165.4.6.2.6.%d.""" % sub_oid + str(random.randint(1, 100000)) + """
instanceType: 4
objectClassCategory: 1
subClassOf: organizationalPerson
def test_objectClass_computer(self):
res = self.ldb.search(self.base_dn, expression="objectClass=computer",
- attrs=["serverReferenceBL","msDS-isRODC"], controls=["search_options:1:2"])
+ attrs=["serverReferenceBL", "msDS-isRODC"], controls=["search_options:1:2"])
for ldb_msg in res:
if "serverReferenceBL" not in ldb_msg:
print("Computer entry %s doesn't have a serverReferenceBL attribute" % ldb_msg['dn'])
objectClass: top
objectClass: attributeSchema
cn: """ + attr_name + """
-attributeId: 1.3.6.1.4.1.7165.4.6.1.1.""" + str(random.randint(1,100000)) + """
+attributeId: 1.3.6.1.4.1.7165.4.6.1.1.""" + str(random.randint(1, 100000)) + """
attributeSyntax: 2.5.5.14
omSyntax: 127
omObjectClass: \x2A\x86\x48\x86\xF7\x14\x01\x01\x01\x0C
adminDescription: """ + class_name + """
adminDisplayName: """ + class_name + """
cn: """ + class_name + """
-governsId: 1.3.6.1.4.1.7165.4.6.2.1.""" + str(random.randint(1,100000)) + """
+governsId: 1.3.6.1.4.1.7165.4.6.2.1.""" + str(random.randint(1, 100000)) + """
schemaIdGuid: """ + str(uuid.uuid4()) + """
objectClassCategory: 1
subClassOf: organizationalPerson
objectClass: top
objectClass: attributeSchema
cn: """ + attr_name + """
-attributeId: 1.3.6.1.4.1.7165.4.6.1.2.""" + str(random.randint(1,100000)) + """
+attributeId: 1.3.6.1.4.1.7165.4.6.1.2.""" + str(random.randint(1, 100000)) + """
attributeSyntax: 2.5.5.7
omSyntax: 127
omObjectClass: \x2A\x86\x48\x86\xF7\x14\x01\x01\x01\x0B
adminDescription: """ + class_name + """
adminDisplayName: """ + class_name + """
cn: """ + class_name + """
-governsId: 1.3.6.1.4.1.7165.4.6.2.2.""" + str(random.randint(1,100000)) + """
+governsId: 1.3.6.1.4.1.7165.4.6.2.2.""" + str(random.randint(1, 100000)) + """
schemaIdGuid: """ + str(uuid.uuid4()) + """
objectClassCategory: 1
subClassOf: organizationalPerson
# removing a duplicate link in the same message should fail
self.add_linked_attribute(g2, [u1, u2])
self.assertRaises(ldb.LdbError,
- self.remove_linked_attribute,g2, [u1, u1])
+ self.remove_linked_attribute, g2, [u1, u1])
def _test_la_links_delete_link_reveal(self):
u1, u2 = self.add_objects(2, 'user', 'u_del_link_reveal')
'rodc', 'preload',
user_dn,
credstring,
- '--server', RWDC,]
+ '--server', RWDC, ]
print(' '.join(cmd))
subprocess.check_call(cmd)
m = Message()
m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
- m["description"] = MessageElement(["desc1","desc2"], FLAG_MOD_REPLACE,
+ m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_REPLACE,
"description")
try:
ldb.modify(m)
m = Message()
m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
- m["description"] = MessageElement(["desc1","desc2"], FLAG_MOD_DELETE,
+ m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_DELETE,
"description")
ldb.modify(m)
m = Message()
m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
- m["description"] = MessageElement(["desc1","desc2"], FLAG_MOD_DELETE,
+ m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_DELETE,
"description")
try:
ldb.modify(m)
m = Message()
m.dn = Dn(ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
- m["description"] = MessageElement(["desc1","desc2"], FLAG_MOD_REPLACE,
+ m["description"] = MessageElement(["desc1", "desc2"], FLAG_MOD_REPLACE,
"description")
try:
ldb.modify(m)
# a list of some well-known sids
# objects in Builtin are aready covered by objectclass
protected_list = [
- ["CN=Domain Admins","CN=Users,"],
- ["CN=Schema Admins","CN=Users,"],
- ["CN=Enterprise Admins","CN=Users,"],
- ["CN=Administrator","CN=Users,"],
- ["CN=Domain Controllers","CN=Users,"],
+ ["CN=Domain Admins", "CN=Users,"],
+ ["CN=Schema Admins", "CN=Users,"],
+ ["CN=Enterprise Admins", "CN=Users,"],
+ ["CN=Administrator", "CN=Users,"],
+ ["CN=Domain Controllers", "CN=Users,"],
]
"logonCount": MessageElement(["0"]),
"cn": MessageElement([user_name]),
"countryCode": MessageElement(["0"]),
- "objectClass": MessageElement(["top","person","organizationalPerson","user"]),
+ "objectClass": MessageElement(["top", "person", "organizationalPerson", "user"]),
"instanceType": MessageElement(["4"]),
"distinguishedName": MessageElement([user_dn]),
"sAMAccountType": MessageElement(["805306368"]),
def create_schema_class(self, _ldb, desc=None):
while True:
- class_id = random.randint(0,65535)
+ class_id = random.randint(0, 65535)
class_name = "descriptor-test-class%s" % class_id
class_dn = "CN=%s,%s" % (class_name, self.schema_dn)
try:
["testuser1", "testuser5", "testuser6", "testuser8"],
add_members_operation=True)
self.ldb_admin.add_remove_group_members("Domain Admins",
- ["testuser2","testuser5","testuser6","testuser7"],
+ ["testuser2", "testuser5", "testuser6", "testuser7"],
add_members_operation=True)
self.ldb_admin.add_remove_group_members("Schema Admins",
- ["testuser3","testuser6","testuser7","testuser8"],
+ ["testuser3", "testuser6", "testuser7", "testuser8"],
add_members_operation=True)
self.results = {
sd_sddl = "O:BAG:BAD:P(A;CI;0x000f01ff;;;AU)"
sd = security.descriptor.from_sddl(sd_sddl, self.domain_sid)
- self.ldb_admin.create_ou(self.ou_dn,sd=sd)
+ self.ldb_admin.create_ou(self.ou_dn, sd=sd)
self.ldb_admin.create_ou(self.sub_dn)
ou_res0 = self.sd_utils.ldb.search(self.ou_dn, SCOPE_BASE,
"""Test if the urgent replication is not activated when handling a non urgent object."""
self.ldb.add({
"dn": "cn=nonurgenttest,cn=users," + self.base_dn,
- "objectclass":"user",
- "samaccountname":"nonurgenttest",
- "description":"nonurgenttest description"})
+ "objectclass": "user",
+ "samaccountname": "nonurgenttest",
+ "description": "nonurgenttest description"})
# urgent replication should not be enabled when creating
res = self.ldb.load_partition_usn(self.base_dn)
self.ldb.add({
"dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" %
self.ldb.get_config_basedn(),
- "objectclass":"server",
- "cn":"test server",
- "name":"test server",
- "systemFlags":"50000000"}, ["relax:0"])
+ "objectclass": "server",
+ "cn": "test server",
+ "name": "test server",
+ "systemFlags": "50000000"}, ["relax:0"])
self.ldb.add_ldif(
"""dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
instanceType: 4
isSingleValued: FALSE
showInAdvancedViewOnly: FALSE
-attributeID: 1.3.6.1.4.1.7165.4.6.1.4.""" + str(random.randint(1,100000)) + """
+attributeID: 1.3.6.1.4.1.7165.4.6.1.4.""" + str(random.randint(1, 100000)) + """
attributeSyntax: 2.5.5.12
adminDisplayName: test attributeSchema
adminDescription: test attributeSchema
cn: test classSchema
instanceType: 4
subClassOf: top
-governsId: 1.3.6.1.4.1.7165.4.6.2.4.""" + str(random.randint(1,100000)) + """
+governsId: 1.3.6.1.4.1.7165.4.6.2.4.""" + str(random.randint(1, 100000)) + """
rDNAttID: cn
showInAdvancedViewOnly: TRUE
adminDisplayName: test classSchema
self.ldb.add({
"dn": "cn=test secret,cn=System," + self.base_dn,
- "objectClass":"secret",
- "cn":"test secret",
- "name":"test secret",
- "currentValue":"xxxxxxx"})
+ "objectClass": "secret",
+ "cn": "test secret",
+ "name": "test secret",
+ "currentValue": "xxxxxxx"})
# urgent replication should be enabled when creating
res = self.ldb.load_partition_usn(self.base_dn)
self.ldb.add({
"dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
- "objectclass":"user",
- "samaccountname":"user UrgAttr test",
- "userAccountControl":str(dsdb.UF_NORMAL_ACCOUNT),
- "lockoutTime":"0",
- "pwdLastSet":"0",
- "description":"urgent attributes test description"})
+ "objectclass": "user",
+ "samaccountname": "user UrgAttr test",
+ "userAccountControl": str(dsdb.UF_NORMAL_ACCOUNT),
+ "lockoutTime": "0",
+ "pwdLastSet": "0",
+ "description": "urgent attributes test description"})
# urgent replication should NOT be enabled when creating
res = self.ldb.load_partition_usn(self.base_dn)
from ldb import Message, MessageElement, Dn
from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
from samba.dsdb import UF_SCRIPT, UF_ACCOUNTDISABLE, UF_00000004, UF_HOMEDIR_REQUIRED, \
- UF_LOCKOUT,UF_PASSWD_NOTREQD, UF_PASSWD_CANT_CHANGE, UF_ENCRYPTED_TEXT_PASSWORD_ALLOWED,\
+ UF_LOCKOUT, UF_PASSWD_NOTREQD, UF_PASSWD_CANT_CHANGE, UF_ENCRYPTED_TEXT_PASSWORD_ALLOWED,\
UF_TEMP_DUPLICATE_ACCOUNT, UF_NORMAL_ACCOUNT, UF_00000400, UF_INTERDOMAIN_TRUST_ACCOUNT, \
UF_WORKSTATION_TRUST_ACCOUNT, UF_SERVER_TRUST_ACCOUNT, UF_00004000, \
UF_00008000, UF_DONT_EXPIRE_PASSWD, UF_MNS_LOGON_ACCOUNT, UF_SMARTCARD_REQUIRED, \
creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
bits = [UF_SCRIPT, UF_ACCOUNTDISABLE, UF_00000004, UF_HOMEDIR_REQUIRED,
- UF_LOCKOUT,UF_PASSWD_NOTREQD, UF_PASSWD_CANT_CHANGE,
+ UF_LOCKOUT, UF_PASSWD_NOTREQD, UF_PASSWD_CANT_CHANGE,
UF_ENCRYPTED_TEXT_PASSWORD_ALLOWED,
UF_TEMP_DUPLICATE_ACCOUNT, UF_NORMAL_ACCOUNT, UF_00000400,
UF_INTERDOMAIN_TRUST_ACCOUNT,
_swig_property = property
except NameError:
pass # Python < 2.2 doesn't have 'property'.
-def _swig_setattr_nondynamic(self,class_type,name,value,static=1):
+def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
if (name == "thisown"): return self.this.own(value)
if (name == "this"):
if type(value).__name__ == 'PySwigObject':
self.__dict__[name] = value
return
- method = class_type.__swig_setmethods__.get(name,None)
- if method: return method(self,value)
- if (not static) or hasattr(self,name):
+ method = class_type.__swig_setmethods__.get(name, None)
+ if method: return method(self, value)
+ if (not static) or hasattr(self, name):
self.__dict__[name] = value
else:
raise AttributeError("You cannot add attributes to %s" % self)
-def _swig_setattr(self,class_type,name,value):
- return _swig_setattr_nondynamic(self,class_type,name,value,0)
+def _swig_setattr(self, class_type, name, value):
+ return _swig_setattr_nondynamic(self, class_type, name, value, 0)
-def _swig_getattr(self,class_type,name):
+def _swig_getattr(self, class_type, name):
if (name == "thisown"): return self.this.own()
- method = class_type.__swig_getmethods__.get(name,None)
+ method = class_type.__swig_getmethods__.get(name, None)
if method: return method(self)
raise AttributeError(name)
def _swig_setattr_nondynamic_method(set):
- def set_attr(self,name,value):
+ def set_attr(self, name, value):
if (name == "thisown"): return self.this.own(value)
- if hasattr(self,name) or (name == "this"):
- set(self,name,value)
+ if hasattr(self, name) or (name == "this"):
+ set(self, name, value)
else:
raise AttributeError("You cannot add attributes to %s" % self)
return set_attr
thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
__repr__ = _swig_repr
def __init__(self, *args, **kwargs):
- _wmi.IUnknown_swiginit(self,_wmi.new_IUnknown(*args, **kwargs))
+ _wmi.IUnknown_swiginit(self, _wmi.new_IUnknown(*args, **kwargs))
__swig_destroy__ = _wmi.delete_IUnknown
-IUnknown.Release = new_instancemethod(_wmi.IUnknown_Release,None,IUnknown)
+IUnknown.Release = new_instancemethod(_wmi.IUnknown_Release, None, IUnknown)
IUnknown_swigregister = _wmi.IUnknown_swigregister
IUnknown_swigregister(IUnknown)
thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
__repr__ = _swig_repr
def __init__(self, *args, **kwargs):
- _wmi.IWbemServices_swiginit(self,_wmi.new_IWbemServices(*args, **kwargs))
+ _wmi.IWbemServices_swiginit(self, _wmi.new_IWbemServices(*args, **kwargs))
__swig_destroy__ = _wmi.delete_IWbemServices
-IWbemServices.ExecQuery = new_instancemethod(_wmi.IWbemServices_ExecQuery,None,IWbemServices)
-IWbemServices.ExecNotificationQuery = new_instancemethod(_wmi.IWbemServices_ExecNotificationQuery,None,IWbemServices)
-IWbemServices.CreateInstanceEnum = new_instancemethod(_wmi.IWbemServices_CreateInstanceEnum,None,IWbemServices)
+IWbemServices.ExecQuery = new_instancemethod(_wmi.IWbemServices_ExecQuery, None, IWbemServices)
+IWbemServices.ExecNotificationQuery = new_instancemethod(_wmi.IWbemServices_ExecNotificationQuery, None, IWbemServices)
+IWbemServices.CreateInstanceEnum = new_instancemethod(_wmi.IWbemServices_CreateInstanceEnum, None, IWbemServices)
IWbemServices_swigregister = _wmi.IWbemServices_swigregister
IWbemServices_swigregister(IWbemServices)
thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
__repr__ = _swig_repr
def __init__(self, *args, **kwargs):
- _wmi.IEnumWbemClassObject_swiginit(self,_wmi.new_IEnumWbemClassObject(*args, **kwargs))
+ _wmi.IEnumWbemClassObject_swiginit(self, _wmi.new_IEnumWbemClassObject(*args, **kwargs))
__swig_destroy__ = _wmi.delete_IEnumWbemClassObject
-IEnumWbemClassObject.Reset = new_instancemethod(_wmi.IEnumWbemClassObject_Reset,None,IEnumWbemClassObject)
+IEnumWbemClassObject.Reset = new_instancemethod(_wmi.IEnumWbemClassObject_Reset, None, IEnumWbemClassObject)
IEnumWbemClassObject_swigregister = _wmi.IEnumWbemClassObject_swigregister
IEnumWbemClassObject_swigregister(IEnumWbemClassObject)
continue
self.global_objs = {}
-def attid_equal(a1,a2):
+def attid_equal(a1, a2):
return (a1 & 0xffffffff) == (a2 & 0xffffffff)
########### main code ###########
def find_domain_sid(self, ldb):
res = ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
- return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
+ return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
def setUp(self):
super(SpeedTest, self).setUp()
bindoptions = ''
if t == 'rpc.cracknames':
bindoptions = 'seal'
- plansmbtorture4testsuite(t, env, ["%s:$SERVER[%s]" % (transport,bindoptions), '-U$USERNAME%$PASSWORD', '--workgroup=$DOMAIN'], "samba4.%s on %s with %s" % (t, transport, bindoptions))
+ plansmbtorture4testsuite(t, env, ["%s:$SERVER[%s]" % (transport, bindoptions), '-U$USERNAME%$PASSWORD', '--workgroup=$DOMAIN'], "samba4.%s on %s with %s" % (t, transport, bindoptions))
# Tests for the DFS referral calls implementation
for t in smbtorture4_testsuites("dfs."):
if env == "s4member":
users = ["$DOMAIN/$DC_USERNAME", "$DC_USERNAME@$REALM"]
for usr in users:
- plantestsuite("samba4.winbind.dom_name_parse.cmd", env, "%s/dom_parse.sh %s %s" % (bbdir,cmd,usr))
+ plantestsuite("samba4.winbind.dom_name_parse.cmd", env, "%s/dom_parse.sh %s %s" % (bbdir, cmd, usr))
nsstest4 = binpath("nsstest")
for env in ["ad_dc:local", "s4member:local", "nt4_dc:local", "ad_member:local", "nt4_member:local"]:
name_orig = obj_orig["name"][0]
name_cur = user_cur["name"][0]
if is_deleted:
- self.assertEquals(user_cur["isDeleted"][0],"TRUE")
+ self.assertEquals(user_cur["isDeleted"][0], "TRUE")
self.assertFalse("objectCategory" in user_cur)
self.assertFalse("sAMAccountType" in user_cur)
self.assertFalse("description" in user_cur)
(result, out, err) = self.runsubcmd(*samba_tool_cmdline)
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
def _enable_inbound_repl(self, DC):
# make base command line
samba_tool_cmd += [DC, "--dsa-option=-DISABLE_INBOUND_REPL"]
(result, out, err) = self.runsubcmd(*samba_tool_cmd)
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
def _disable_inbound_repl(self, DC):
# make base command line
samba_tool_cmd += [DC, "--dsa-option=+DISABLE_INBOUND_REPL"]
(result, out, err) = self.runsubcmd(*samba_tool_cmd)
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
def _enable_all_repl(self, DC):
self._enable_inbound_repl(DC)
samba_tool_cmd += [DC, "--dsa-option=-DISABLE_OUTBOUND_REPL"]
(result, out, err) = self.runsubcmd(*samba_tool_cmd)
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
def _disable_all_repl(self, DC):
self._disable_inbound_repl(DC)
samba_tool_cmd += [DC, "--dsa-option=+DISABLE_OUTBOUND_REPL"]
(result, out, err) = self.runsubcmd(*samba_tool_cmd)
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
def _get_highest_hwm_utdv(self, ldb_conn):
res = ldb_conn.search("", scope=ldb.SCOPE_BASE, attrs=["highestCommittedUSN"])
cmd_line_auth)
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
if noop == False:
self.assertTrue("FSMO transfer of '%s' role successful" % role in out)
else:
})
dc3_id = self._get_identifier(self.ldb_dc1, dc3)
- (hwm1, utdv1) = self._check_replication([ou1,ou2,dc3],
+ (hwm1, utdv1) = self._check_replication([ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_WRIT_REP)
- self._check_replication([ou1,ou2,dc3],
+ self._check_replication([ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
- self._check_replication([ou1,ou2,dc3],
+ self._check_replication([ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
m["displayName"] = ldb.MessageElement("OU1", ldb.FLAG_MOD_ADD, "displayName")
self.ldb_dc1.modify(m)
- (hwm2, utdv2) = self._check_replication([ou2,dc3,ou1],
+ (hwm2, utdv2) = self._check_replication([ou2, dc3, ou1],
drsuapi.DRSUAPI_DRS_WRIT_REP)
- self._check_replication([ou1,ou2,dc3],
+ self._check_replication([ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
- self._check_replication([ou1,ou2,dc3],
+ self._check_replication([ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
m["displayName"] = ldb.MessageElement("OU2", ldb.FLAG_MOD_ADD, "displayName")
self.ldb_dc1.modify(m)
- (hwm3, utdv3) = self._check_replication([dc3,ou1,ou2],
+ (hwm3, utdv3) = self._check_replication([dc3, ou1, ou2],
drsuapi.DRSUAPI_DRS_WRIT_REP)
- self._check_replication([ou1,ou2,dc3],
+ self._check_replication([ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
- self._check_replication([ou1,ou2,dc3],
+ self._check_replication([ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
- self._check_replication([ou1,ou2],
+ self._check_replication([ou1, ou2],
drsuapi.DRSUAPI_DRS_WRIT_REP,
highwatermark=hwm1)
- self._check_replication([ou1,ou2],
+ self._check_replication([ou1, ou2],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
highwatermark=hwm1)
- self._check_replication([ou1,ou2],
+ self._check_replication([ou1, ou2],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
uptodateness_vector=utdv1)
m["displayName"] = ldb.MessageElement("OU", ldb.FLAG_MOD_ADD, "displayName")
self.ldb_dc1.modify(m)
- (hwm4, utdv4) = self._check_replication([dc3,ou1,ou2,self.ou],
+ (hwm4, utdv4) = self._check_replication([dc3, ou1, ou2, self.ou],
drsuapi.DRSUAPI_DRS_WRIT_REP)
- self._check_replication([self.ou,ou1,ou2,dc3],
+ self._check_replication([self.ou, ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
- self._check_replication([self.ou,ou1,ou2,dc3],
+ self._check_replication([self.ou, ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
- self._check_replication([self.ou,ou2],
+ self._check_replication([self.ou, ou2],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
uptodateness_vector=utdv2)
})
cn3_id = self._get_identifier(self.ldb_dc1, cn3)
- (hwm5, utdv5) = self._check_replication([dc3,ou1,ou2,self.ou,cn3],
+ (hwm5, utdv5) = self._check_replication([dc3, ou1, ou2, self.ou, cn3],
drsuapi.DRSUAPI_DRS_WRIT_REP)
- self._check_replication([self.ou,ou1,ou2,dc3,cn3],
+ self._check_replication([self.ou, ou1, ou2, dc3, cn3],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
- self._check_replication([self.ou,ou1,ou2,dc3],
+ self._check_replication([self.ou, ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
ou2_id.guid, dc3_id.guid)
- (hwm6, utdv6) = self._check_replication([dc3,ou1,self.ou,cn3,ou2],
+ (hwm6, utdv6) = self._check_replication([dc3, ou1, self.ou, cn3, ou2],
drsuapi.DRSUAPI_DRS_WRIT_REP,
expected_links=[ou2_managedBy_dc3])
# Can fail against Windows due to equal precedence of dc3, cn3
- self._check_replication([self.ou,ou1,ou2,dc3,cn3],
+ self._check_replication([self.ou, ou1, ou2, dc3, cn3],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
expected_links=[ou2_managedBy_dc3])
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
- self._check_replication([self.ou,ou1,ou2,dc3],
+ self._check_replication([self.ou, ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
dc3_id.guid, ou1_id.guid)
- (hwm7, utdv7) = self._check_replication([ou1,self.ou,cn3,ou2,dc3],
+ (hwm7, utdv7) = self._check_replication([ou1, self.ou, cn3, ou2, dc3],
drsuapi.DRSUAPI_DRS_WRIT_REP,
- expected_links=[ou2_managedBy_dc3,dc3_managedBy_ou1])
+ expected_links=[ou2_managedBy_dc3, dc3_managedBy_ou1])
# Can fail against Windows due to equal precedence of dc3, cn3
#self._check_replication([self.ou,ou1,ou2,dc3,cn3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
expected_links=[dc3_managedBy_ou1])
- self._check_replication([self.ou,ou1,ou2,dc3],
+ self._check_replication([self.ou, ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC,
expected_links=[dc3_managedBy_ou1])
drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
dc3_id.guid, ou2_id.guid)
- (hwm8, utdv8) = self._check_replication([ou1,self.ou,cn3,ou2,dc3],
+ (hwm8, utdv8) = self._check_replication([ou1, self.ou, cn3, ou2, dc3],
drsuapi.DRSUAPI_DRS_WRIT_REP,
- expected_links=[ou2_managedBy_dc3,dc3_managedBy_ou1,dc3_managedBy_ou2])
+ expected_links=[ou2_managedBy_dc3, dc3_managedBy_ou1, dc3_managedBy_ou2])
# Can fail against Windows due to equal precedence of dc3, cn3
#self._check_replication([self.ou,ou1,ou2,dc3,cn3],
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2])
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2])
- self._check_replication([self.ou,ou1,ou2,dc3],
+ self._check_replication([self.ou, ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2])
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2])
# GET_TGT will also return any DNs referenced by the linked attributes
# (including the Tombstone attribute)
self._check_replication([ou1, ou2, dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
more_flags=drsuapi.DRSUAPI_DRS_GET_TGT,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2], dn_ordered=False)
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2], dn_ordered=False)
# Use the highwater-mark prior to changing ManagedBy - this should
# only return the old/Tombstone and new linked attributes (we already
# know all the DNs)
self._check_replication([],
drsuapi.DRSUAPI_DRS_WRIT_REP,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2],
highwatermark=hwm7)
self._check_replication([],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2],
highwatermark=hwm7)
self._check_replication([],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2],
highwatermark=hwm7)
self._check_replication([],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2],
highwatermark=hwm7)
self._check_replication([],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
more_flags=drsuapi.DRSUAPI_DRS_GET_TGT,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2],
highwatermark=hwm7)
# Repeat the above set of tests using the uptodateness_vector
# instead of the highwater-mark
self._check_replication([],
drsuapi.DRSUAPI_DRS_WRIT_REP,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2],
uptodateness_vector=utdv7)
self._check_replication([],
drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2],
uptodateness_vector=utdv7)
self._check_replication([],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2],
uptodateness_vector=utdv7)
self._check_replication([],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2],
uptodateness_vector=utdv7)
self._check_replication([],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY,
more_flags=drsuapi.DRSUAPI_DRS_GET_TGT,
- expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
+ expected_links=[dc3_managedBy_ou1, dc3_managedBy_ou2],
uptodateness_vector=utdv7)
def test_FSMONotOwner(self):
"cn": class_name,
"lDAPDisplayName": class_ldn,
"governsId": "1.3.6.1.4.1.7165.4.6.2.5." \
- + str((100000 * base_int) + random.randint(1,100000)) + ".1.5.13",
+ + str((100000 * base_int) + random.randint(1, 100000)) + ".1.5.13",
"instanceType": "4",
"objectClassCategory": "%d" % oc_cat,
"subClassOf": "top",
"cn": attr_name,
"lDAPDisplayName": attr_ldn,
"attributeId": "1.3.6.1.4.1.7165.4.6.1.5." \
- + str((100000 * base_int) + random.randint(1,100000)) + ".1.5.13",
+ + str((100000 * base_int) + random.randint(1, 100000)) + ".1.5.13",
"attributeSyntax": "2.5.5.12",
"omSyntax": "64",
"instanceType": "4",
AttributeID_id in Schema cache"""
# add new attributeSchema object
(a_ldn, a_dn) = self._schema_new_attr(self.ldb_dc1, "attr-Link-X", 10,
- attrs={'linkID':"1.2.840.113556.1.2.50",
+ attrs={'linkID': "1.2.840.113556.1.2.50",
"attributeSyntax": "2.5.5.1",
"omSyntax": "127"})
# add a base classSchema class so we can use our new
# Deleted Object base DN
dodn = self._deleted_objects_dn(sam_ldb)
# now check properties of the user
- name_cur = ou_cur["ou"][0]
- self.assertEquals(ou_cur["isDeleted"][0],"TRUE")
+ name_cur = ou_cur["ou"][0]
+ self.assertEquals(ou_cur["isDeleted"][0], "TRUE")
self.assertTrue(not("objectCategory" in ou_cur))
self.assertTrue(dodn in str(ou_cur["dn"]),
"OU %s is deleted but it is not located under %s!" % (name_cur, dodn))
# Deleted Object base DN
dodn = self._deleted_objects_dn(sam_ldb)
# now check properties of the user
- name_cur = ou_cur["ou"][0]
- self.assertEquals(ou_cur["isDeleted"][0],"TRUE")
+ name_cur = ou_cur["ou"][0]
+ self.assertEquals(ou_cur["isDeleted"][0], "TRUE")
self.assertTrue(not("objectCategory" in ou_cur))
self.assertTrue(dodn in str(ou_cur["dn"]),
"OU %s is deleted but it is not located under %s!" % (name_cur, dodn))
(result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
# 3. Assert we get the RID Set
res = new_ldb.search(base=server_ref_dn,
# 4. Seize the RID Manager role
(result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
self.assertCmdSuccess(result, out, err)
- self.assertEquals(err,"","Shouldn't be any error messages")
+ self.assertEquals(err, "", "Shouldn't be any error messages")
# 5. Add a new user (triggers RID set work)
new_ldb.newuser("ridalloctestuser", "P@ssword!")
for v in ['VM', 'HOSTNAME', 'USER', 'PASS', 'SNAPSHOT', 'REALM', 'DOMAIN', 'IP']:
vname = '%s_%s' % (vm, v)
if vname in self.vars:
- self.setvar("%s_%s" % (prefix,v), self.substitute("${%s}" % vname))
+ self.setvar("%s_%s" % (prefix, v), self.substitute("${%s}" % vname))
else:
- self.vars.pop("%s_%s" % (prefix,v), None)
+ self.vars.pop("%s_%s" % (prefix, v), None)
if self.getvar("WIN_REALM"):
self.setvar("WIN_REALM", self.getvar("WIN_REALM").upper())