FLAG_MOD_ADD, "member")
self.ldb.modify(m)
- self.question = 6*(9-2)
+ self.question = 6 * (9 -2)
self.answer = 42
def tearDown(self):
def test_remove_base_components(self):
x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
- x.remove_base_components(len(x)-1)
+ x.remove_base_components(len(x) - 1)
self.assertEqual("dc=foo24", str(x))
def test_parse_ldif(self):
var_end = text.find("}", var_start)
raise Exception("Not all variables substituted: %s" %
- text[var_start:var_end+1])
+ text[var_start:var_end + 1])
def read_and_sub_file(file_name, subst_vars):
raise RuntimeError("Invalid DN %s" % dnstring)
prefix_len = 4 + len(colons[1]) + int(colons[1])
self.prefix = dnstring[0:prefix_len]
- self.binary = self.prefix[3+len(colons[1]):-1]
+ self.binary = self.prefix[3 + len(colons[1]):-1]
self.dnstring = dnstring[prefix_len:]
else:
self.dnstring = dnstring
# check that the dsServiceName is in GUID form
if not 'dsServiceName' in obj:
self.report('ERROR: dsServiceName missing in @ROOTDSE')
- return error_count+1
+ return error_count + 1
if not obj['dsServiceName'][0].startswith('<GUID='):
self.report('ERROR: dsServiceName not in GUID form in @ROOTDSE')
class inf_to_kdc_tdb(gp_ext_setter):
def mins_to_hours(self):
- return '%d' % (int(self.val)/60)
+ return '%d' % (int(self.val) / 60)
def days_to_hours(self):
- return '%d' % (int(self.val)*24)
+ return '%d' % (int(self.val) * 24)
def set_kdc_tdb(self, val):
old_val = self.gp_db.gpostore.get(self.attribute)
prev = apply_log.find('guid[@value="%s"]' % guid)
if prev is None:
item = etree.SubElement(apply_log, 'guid')
- item.attrib['count'] = '%d' % (len(apply_log)-1)
+ item.attrib['count'] = '%d' % (len(apply_log) - 1)
item.attrib['value'] = guid
def apply_log_pop(self):
user_obj = self.gpdb.find('user[@name="%s"]' % self.user)
apply_log = user_obj.find('applylog')
if apply_log is not None:
- ret = apply_log.find('guid[@count="%d"]' % (len(apply_log)-1))
+ ret = apply_log.find('guid[@count="%d"]' % (len(apply_log) - 1))
if ret is not None:
apply_log.remove(ret)
return ret.attrib['value']
def check_safe_path(path):
dirs = re.split('/|\\\\', path)
if 'sysvol' in path:
- dirs = dirs[dirs.index('sysvol')+1:]
+ dirs = dirs[dirs.index('sysvol') + 1:]
if not '..' in dirs:
return os.path.join(*dirs)
raise OSError(path)
def __len__(self):
if "global" in self._lp.services():
- return len(self._lp)-1
+ return len(self._lp) - 1
return len(self._lp)
def keys(self):
# For each r(i) from (0 <= i < |R|-1)
i = 0
- while i < (r_len-1):
+ while i < (r_len - 1):
# Add an edge from r(i) to r(i+1) if r(i) is a full
# replica or r(i+1) is a partial replica
- if not r_list[i].is_partial() or r_list[i+1].is_partial():
- graph_list[i+1].add_edge_from(r_list[i].rep_dsa_dnstr)
+ if not r_list[i].is_partial() or r_list[i +1].is_partial():
+ graph_list[i + 1].add_edge_from(r_list[i].rep_dsa_dnstr)
# Add an edge from r(i+1) to r(i) if r(i+1) is a full
# replica or ri is a partial replica.
- if not r_list[i+1].is_partial() or r_list[i].is_partial():
- graph_list[i].add_edge_from(r_list[i+1].rep_dsa_dnstr)
+ if not r_list[i + 1].is_partial() or r_list[i].is_partial():
+ graph_list[i].add_edge_from(r_list[i + 1].rep_dsa_dnstr)
i = i + 1
# Add an edge from r|R|-1 to r0 if r|R|-1 is a full replica
# or r0 is a partial replica.
- if not r_list[r_len-1].is_partial() or r_list[0].is_partial():
- graph_list[0].add_edge_from(r_list[r_len-1].rep_dsa_dnstr)
+ if not r_list[r_len - 1].is_partial() or r_list[0].is_partial():
+ graph_list[0].add_edge_from(r_list[r_len - 1].rep_dsa_dnstr)
# Add an edge from r0 to r|R|-1 if r0 is a full replica or
# r|R|-1 is a partial replica.
- if not r_list[0].is_partial() or r_list[r_len-1].is_partial():
- graph_list[r_len-1].add_edge_from(r_list[0].rep_dsa_dnstr)
+ if not r_list[0].is_partial() or r_list[r_len -1].is_partial():
+ graph_list[r_len - 1].add_edge_from(r_list[0].rep_dsa_dnstr)
DEBUG("r_list is length %s" % len(r_list))
DEBUG('\n'.join(str((x.rep_dsa_guid, x.rep_dsa_dnstr))
for node in tree:
if node.text and node.text.startswith('|Operation'):
# Strip first and last |
- updates = [x[1:len(x)-1].split('|') for x in
+ updates = [x[1:len(x) - 1].split('|') for x in
ET.tostring(node,method='text').splitlines()]
for update in updates[2:]:
output = re.match('Operation (\d+): {(.*)}', update[0])
class PlainHelpFormatter(optparse.IndentedHelpFormatter):
def format_description(self,description=""):
desc_width = self.width - self.current_indent
- indent = " "*self.current_indent
+ indent = " " * self.current_indent
paragraphs = description.split('\n')
wrapped_paragraphs = [
textwrap.fill(p,
return parser, optiongroups
def message(self, text):
- self.outf.write(text+"\n")
+ self.outf.write(text + "\n")
def _run(self, *argv):
parser, optiongroups = self._create_parser(argv[0])
if not full_new_ou_dn.is_child_of(domain_dn):
full_new_ou_dn.add_base(domain_dn)
new_computer_dn = ldb.Dn(samdb, str(computer_dn))
- new_computer_dn.remove_base_components(len(computer_dn)-1)
+ new_computer_dn.remove_base_components(len(computer_dn) -1)
new_computer_dn.add_base(full_new_ou_dn)
try:
samdb.rename(computer_dn, new_computer_dn)
# 512 bytes and a 2 bytes confounder is required.
#
def random_trust_secret(length):
- pw = samba.generate_random_machine_password(length//2, length//2)
+ pw = samba.generate_random_machine_password(length // 2, length // 2)
return string_to_byte_array(pw.encode('utf-16-le'))
if local_trust_info.trust_direction & lsa.LSA_TRUST_DIRECTION_INBOUND:
(new_parent_dn, e.message))
full_new_group_dn = ldb.Dn(samdb, str(group_dn))
- full_new_group_dn.remove_base_components(len(group_dn)-1)
+ full_new_group_dn.remove_base_components(len(group_dn) - 1)
full_new_group_dn.add_base(full_new_parent_dn)
try:
# when compare content of two DCs. Uncomment for DEBUG purposes.
if self.two_domains and not self.quiet:
self.outf.write("\n* Place-holders for %s:\n" % self.host)
- self.outf.write(4*" " + "${DOMAIN_DN} => %s\n" %
+ self.outf.write(4 * " " + "${DOMAIN_DN} => %s\n" %
self.base_dn)
- self.outf.write(4*" " + "${DOMAIN_NETBIOS} => %s\n" %
+ self.outf.write(4 * " " + "${DOMAIN_NETBIOS} => %s\n" %
self.domain_netbios)
- self.outf.write(4*" " + "${SERVER_NAME} => %s\n" %
+ self.outf.write(4 * " " + "${SERVER_NAME} => %s\n" %
self.server_names)
- self.outf.write(4*" " + "${DOMAIN_NAME} => %s\n" %
+ self.outf.write(4 * " " + "${DOMAIN_NAME} => %s\n" %
self.domain_name)
def find_domain_sid(self):
def diff_1(self, other):
res = ""
if len(self.dacl_list) != len(other.dacl_list):
- res += 4*" " + "Difference in ACE count:\n"
- res += 8*" " + "=> %s\n" % len(self.dacl_list)
- res += 8*" " + "=> %s\n" % len(other.dacl_list)
+ res += 4 * " " + "Difference in ACE count:\n"
+ res += 8 * " " + "=> %s\n" % len(self.dacl_list)
+ res += 8 * " " + "=> %s\n" % len(other.dacl_list)
#
i = 0
flag = True
def diff_2(self, other):
res = ""
if len(self.dacl_list) != len(other.dacl_list):
- res += 4*" " + "Difference in ACE count:\n"
- res += 8*" " + "=> %s\n" % len(self.dacl_list)
- res += 8*" " + "=> %s\n" % len(other.dacl_list)
+ res += 4 * " " + "Difference in ACE count:\n"
+ res += 8 * " " + "=> %s\n" % len(self.dacl_list)
+ res += 8 * " " + "=> %s\n" % len(other.dacl_list)
#
common_aces = []
self_aces = []
common_aces.append(ace)
self_aces = sorted(self_aces)
if len(self_aces) > 0:
- res += 4*" " + "ACEs found only in %s:\n" % self.con.host
+ res += 4 *" " + "ACEs found only in %s:\n" % self.con.host
for ace in self_aces:
- res += 8*" " + ace + "\n"
+ res += 8 * " " + ace + "\n"
#
for ace in other_dacl_list_fixed:
try:
common_aces.append(ace)
other_aces = sorted(other_aces)
if len(other_aces) > 0:
- res += 4*" " + "ACEs found only in %s:\n" % other.con.host
+ res += 4 *" " + "ACEs found only in %s:\n" % other.con.host
for ace in other_aces:
- res += 8*" " + ace + "\n"
+ res += 8 * " " + ace + "\n"
#
common_aces = sorted(list(set(common_aces)))
if self.con.verbose:
- res += 4*" " + "ACEs found in both:\n"
+ res += 4 *" " + "ACEs found in both:\n"
for ace in common_aces:
- res += 8*" " + ace + "\n"
+ res += 8 * " " + ace + "\n"
return (self_aces == [] and other_aces == [], res)
class LDAPObject(object):
Log on the screen if there is no --quiet option set
"""
if not self.quiet:
- self.outf.write(msg+"\n")
+ self.outf.write(msg +"\n")
def fix_dn(self, s):
res = "%s" % s
if not self.two_domains:
return res
if res.upper().endswith(self.con.base_dn.upper()):
- res = res[:len(res)-len(self.con.base_dn)] + "${DOMAIN_DN}"
+ res = res[:len(res) - len(self.con.base_dn)] + "${DOMAIN_DN}"
return res
def fix_domain_name(self, s):
other.unique_attrs = []
if self.attributes.keys() != other.attributes.keys():
#
- title = 4*" " + "Attributes found only in %s:" % self.con.host
+ title = 4 * " " + "Attributes found only in %s:" % self.con.host
for x in self.attributes.keys():
if not x in other.attributes.keys() and \
not x.upper() in [q.upper() for q in other.ignore_attributes]:
if title:
res += title + "\n"
title = None
- res += 8*" " + x + "\n"
+ res += 8 * " " + x + "\n"
self.unique_attrs.append(x)
#
- title = 4*" " + "Attributes found only in %s:" % other.con.host
+ title = 4 *" " + "Attributes found only in %s:" % other.con.host
for x in other.attributes.keys():
if not x in self.attributes.keys() and \
not x.upper() in [q.upper() for q in self.ignore_attributes]:
if title:
res += title + "\n"
title = None
- res += 8*" " + x + "\n"
+ res += 8 * " " + x + "\n"
other.unique_attrs.append(x)
#
missing_attrs = [x.upper() for x in self.unique_attrs]
missing_attrs += [x.upper() for x in other.unique_attrs]
- title = 4*" " + "Difference in attribute values:"
+ title = 4 * " " + "Difference in attribute values:"
for x in self.attributes.keys():
if x.upper() in self.ignore_attributes or x.upper() in missing_attrs:
continue
res += title + "\n"
title = None
if p and q:
- res += 8*" " + x + " => \n%s\n%s" % (p, q) + "\n"
+ res += 8 * " " + x + " => \n%s\n%s" % (p, q) + "\n"
else:
- res += 8*" " + x + " => \n%s\n%s" % (self.attributes[x], other.attributes[x]) + "\n"
+ res += 8 * " " + x + " => \n%s\n%s" % (self.attributes[x], other.attributes[x]) + "\n"
self.df_value_attrs.append(x)
#
if self.unique_attrs + other.unique_attrs != []:
while counter < len(self.dn_list) and self.two_domains:
# Use alias reference
tmp = self.dn_list[counter]
- tmp = tmp[:len(tmp)-len(self.con.base_dn)] + "${DOMAIN_DN}"
+ tmp = tmp[:len(tmp) -len(self.con.base_dn)] + "${DOMAIN_DN}"
tmp = tmp.replace("CN=%s" % self.con.domain_netbios, "CN=${DOMAIN_NETBIOS}")
if len(self.con.server_names) == 1:
for x in self.con.server_names:
Log on the screen if there is no --quiet option set
"""
if not self.quiet:
- self.outf.write(msg+"\n")
+ self.outf.write(msg + "\n")
def update_size(self):
self.size = len(self.dn_list)
self.log(title)
title = None
res = False
- self.log(4*" " + x)
+ self.log(4 * " " + x)
self.dn_list[self.dn_list.index(x)] = ""
self.dn_list = [x for x in self.dn_list if x]
#
self.log(title)
title = None
res = False
- self.log(4*" " + x)
+ self.log(4 * " " + x)
other.dn_list[other.dn_list.index(x)] = ""
other.dn_list = [x for x in other.dn_list if x]
#
self.log("\nComparing:")
self.log("'%s' [%s]" % (object1.dn, object1.con.host))
self.log("'%s' [%s]" % (object2.dn, object2.con.host))
- self.log(4*" " + "OK")
+ self.log(4 * " " + "OK")
else:
self.log("\nComparing:")
self.log("'%s' [%s]" % (object1.dn, object1.con.host))
self.log("'%s' [%s]" % (object2.dn, object2.con.host))
self.log(object1.screen_output)
- self.log(4*" " + "FAILED")
+ self.log(4 * " " + "FAILED")
res = False
self.summary = object1.summary
other.summary = object2.summary
#
if self.summary["unique_attrs"]:
self.log("\nAttributes found only in %s:" % self.con.host)
- self.log("".join([str("\n" + 4*" " + x) for x in self.summary["unique_attrs"]]))
+ self.log("".join([str("\n" + 4 * " " + x) for x in self.summary["unique_attrs"]]))
#
if self.summary["df_value_attrs"]:
self.log("\nAttributes with different values:")
- self.log("".join([str("\n" + 4*" " + x) for x in self.summary["df_value_attrs"]]))
+ self.log("".join([str("\n" + 4 * " " + x) for x in self.summary["df_value_attrs"]]))
self.summary["df_value_attrs"] = []
net = Net(creds, lp, server=credopts.ipaddress)
if server_name is None:
server_name = common.netcmd_dnsname(lp)
- self.outf.write(net.time(server_name)+"\n")
+ self.outf.write(net.time(server_name) + "\n")
domain_sid = security.dom_sid(samdb.domain_sid)
except:
raise CommandError("Unable to read domain SID from configuration files")
- self.outf.write(acl.as_sddl(domain_sid)+"\n")
+ self.outf.write(acl.as_sddl(domain_sid) + "\n")
else:
self.outf.write(ndr_print(acl))
s3conf.set("passdb backend", "samba_dsdb:%s" % samdb.url)
LA_sid = security.dom_sid(str(domain_sid)
- + "-"+str(security.DOMAIN_RID_ADMINISTRATOR))
+ + "-" +str(security.DOMAIN_RID_ADMINISTRATOR))
BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
s4_passdb = passdb.PDB(s3conf.get("passdb backend"))
(new_parent_dn, e.message))
full_new_ou_dn = ldb.Dn(samdb, str(full_old_ou_dn))
- full_new_ou_dn.remove_base_components(len(full_old_ou_dn)-1)
+ full_new_ou_dn.remove_base_components(len(full_old_ou_dn) - 1)
full_new_ou_dn.add_base(full_new_parent_dn)
try:
# TODO once I understand how, use the domain info to naildown
# to the correct domain
(cleaneduser, realm, domain) = _get_user_realm_domain(user)
- self.outf.write(cleaneduser+"\n")
+ self.outf.write(cleaneduser + "\n")
res = sam.search(
expression="samaccountname=%s" % ldb.binary_encode(cleaneduser),
scope=ldb.SCOPE_SUBTREE, attrs=["servicePrincipalName"])
credentials=creds, lp=lp)
try:
- samdb.setexpiry(filter, days*24*3600, no_expiry_req=noexpiry)
+ samdb.setexpiry(filter, days * 24 * 3600, no_expiry_req=noexpiry)
except Exception as msg:
# FIXME: Catch more specific exception
raise CommandError("Failed to set expiry for user '%s': %s" % (
digests = ndr_unpack(drsblobs.package_PrimaryWDigestBlob,
primary_wdigest)
try:
- digest = binascii.hexlify(bytearray(digests.hashes[i-1].hash))
+ digest = binascii.hexlify(bytearray(digests.hashes[i - 1].hash))
return "%s:%s:%s" % (user, realm, digest)
except IndexError:
return None
(new_parent_dn, e.message))
full_new_user_dn = ldb.Dn(samdb, str(user_dn))
- full_new_user_dn.remove_base_components(len(user_dn)-1)
+ full_new_user_dn.remove_base_components(len(user_dn) - 1)
full_new_user_dn.add_base(full_new_parent_dn)
try:
url_list = filter(None,self.ol_mmr_urls.split(','))
for url in url_list:
- self.logger.info("Using LDAP-URL: "+url)
+ self.logger.info("Using LDAP-URL: " + url)
if len(url_list) == 1:
raise ProvisioningError("At least 2 LDAP-URLs needed for MMR!")
for i in range(0, len(res)):
expression = ("(&(objectclass=attributeSchema)(linkID=%d)"
"(attributeSyntax=2.5.5.1))" %
- (int(res[i]["linkID"][0])+1))
+ (int(res[i]["linkID"][0]) + 1))
target = schemaldb.searchone(basedn=schemadn,
expression=expression,
attribute="lDAPDisplayName",
hr = ' '.join(["%02X" % x for x in lr])
ll = ll.translate(HEXDUMP_FILTER).decode('utf8')
lr = lr.translate(HEXDUMP_FILTER).decode('utf8')
- result += "[%04X] %-*s %-*s %s %s\n" % (N, 8*3, hl, 8*3, hr, ll, lr)
+ result += "[%04X] %-*s %-*s %s %s\n" % (N, 8 * 3, hl, 8 * 3, hr, ll, lr)
N += 16
return result
self.assertTrue(samba.valid_netbios_name("FOO"))
def test_too_long(self):
- self.assertFalse(samba.valid_netbios_name("FOO"*10))
+ self.assertFalse(samba.valid_netbios_name("FOO" * 10))
def test_invalid_characters(self):
self.assertFalse(samba.valid_netbios_name("*BLA"))
raise TimeoutHelper.Timeout()
-def _make_cmdline(data='$', repeat=5*1024*1024, retcode=0):
+def _make_cmdline(data='$', repeat=5 *1024 *1024, retcode=0):
"""Build a command to call gen_output.py to generate large output"""
return 'gen_output.py --data {} --repeat {} --retcode {}'.format(data, repeat, retcode)
def setUpClass(cls):
good_dns = ["SAMDOM.EXAMPLE.COM",
"1.EXAMPLE.COM",
- "%sEXAMPLE.COM" % ("1."*100),
+ "%sEXAMPLE.COM" % ("1." * 100),
"EXAMPLE",
"\n.COM",
"!@#$%^&*()_",
# actually create these records.
invalid_mx = ["SAMDOM.EXAMPLE.COM -1",
"SAMDOM.EXAMPLE.COM 65536",
- "%s 1" % "A"*256]
+ "%s 1" % "A" *256]
invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
"SAMDOM.EXAMPLE.COM 0 0 65536",
"SAMDOM.EXAMPLE.COM 65536 0 0"]
"""
for record_type_str in self.good_records:
for i in range(1, len(self.good_records[record_type_str])):
- record1 = self.good_records[record_type_str][i-1]
+ record1 = self.good_records[record_type_str][i - 1]
record2 = self.good_records[record_type_str][i]
if record_type_str == 'CNAME':
req = self.generate_alter(call_id=1,
max_xmit_frag=alter_xmit,
max_recv_frag=alter_recv,
- assoc_group_id=0xffffffff-rep.u.assoc_group_id,
+ assoc_group_id=0xffffffff - rep.u.assoc_group_id,
ctx_list=[ctx1])
self.send_pdu(req)
rep = self.recv_pdu()
computer = 'UNKNOWNCOMPUTER'
computer_utf16 = unicode(computer, 'utf-8').encode('utf-16-le')
- real_stub = struct.pack('<IIII', 0x00200000,
- len(server)+1, 0, len(server)+1)
+ real_stub = struct.pack('<IIII', 0x00200000,
+ len(server) + 1, 0, len(server) + 1)
real_stub += server_utf16 + '\x00\x00'
mod_len = len(real_stub) % 4
if mod_len != 0:
real_stub += '\x00' * (4 - mod_len)
real_stub += struct.pack('<III',
- len(computer)+1, 0, len(computer)+1)
+ len(computer) + 1, 0, len(computer) + 1)
real_stub += computer_utf16 + '\x00\x00'
real_stub += '\x11\x22\x33\x44\x55\x66\x77\x88'
# And now try a new request
req2 = self.generate_request(call_id = 2,
- context_id=ctx.context_id-1,
+ context_id=ctx.context_id - 1,
opnum=0,
stub="")
self.send_pdu(req2)
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
auth_context_id=auth_context_id,
- auth_blob="\x01"+"\x00"*15)
+ auth_blob="\x01" +"\x00" *15)
req = self.generate_request(call_id = 3,
context_id=ctx1.context_id,
opnum=0,
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY,
auth_context_id=auth_context_id,
- auth_blob="\x01"+"\x00"*15)
+ auth_blob="\x01" +"\x00" * 15)
req = self.generate_request(call_id = 4,
context_id=ctx1.context_id,
opnum=0,
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=dcerpc.DCERPC_AUTH_LEVEL_CONNECT,
auth_context_id=auth_context_id,
- auth_blob="\x01"+"\x00"*15)
+ auth_blob="\x01" +"\x00" * 15)
req = self.generate_request(call_id = 3,
context_id=ctx1.context_id,
opnum=0,
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
auth_context_id=auth_context_id,
- auth_blob="\x01"+"\x00"*15)
+ auth_blob="\x01" +"\x00" * 15)
req = self.generate_request(call_id = 1,
context_id=ctx1.context_id,
opnum=0,
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
auth_context_id=auth_context_id,
- auth_blob="\x01"+"\x00"*15)
+ auth_blob="\x01" +"\x00" * 15)
req = self.generate_request(call_id = 3,
context_id=ctx1.context_id,
opnum=0,
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
auth_context_id=auth_context_id,
- auth_blob="\x01"+"\x00"*15)
+ auth_blob="\x01" +"\x00" * 15)
req = self.generate_request(call_id = 3,
context_id=ctx1.context_id,
opnum=0,
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
auth_context_id=auth_context_id,
- auth_blob="\x01"+"\x00"*15)
+ auth_blob="\x01" +"\x00" * 15)
req = self.generate_request(call_id = 3,
context_id=ctx1.context_id,
opnum=0,
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
auth_context_id=auth_context_id,
- auth_blob="\x01"+"\x00"*15)
+ auth_blob="\x01" +"\x00" * 15)
req = self.generate_request(call_id = 3,
context_id=ctx1.context_id,
opnum=0,
sig_size = g.sig_size(len(stub_bin))
else:
sig_size = 16
- zero_sig = "\x00"*sig_size
+ zero_sig = "\x00" * sig_size
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
stub_bin += '\x00' * auth_pad_length
sig_size = g.sig_size(len(stub_bin))
- zero_sig = "\x00"*sig_size
+ zero_sig = "\x00" * sig_size
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
stub_bin += '\x00' * auth_pad_length
sig_size = g.sig_size(len(stub_bin))
- zero_sig = "\x00"*sig_size
+ zero_sig = "\x00" * sig_size
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
stub_bin += '\x00' * auth_pad_length
sig_size = g.sig_size(len(stub_bin))
- zero_sig = "\x00"*sig_size
+ zero_sig = "\x00" * sig_size
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
stub_bin += '\x00' * auth_pad_length
sig_size = g.sig_size(len(stub_bin))
- zero_sig = "\x00"*sig_size
+ zero_sig = "\x00" * sig_size
auth_info = self.generate_auth(auth_type=auth_type,
auth_level=auth_level,
auth_blob=to_server)
req = self.generate_alter(call_id=call_id,
ctx_list=ctx_list,
- assoc_group_id=0xffffffff-assoc_group_id,
+ assoc_group_id=0xffffffff -assoc_group_id,
auth_info=auth_info)
self.send_pdu(req)
rep = self.recv_pdu()
else:
sig_size = 16
- zero_sig = "\x00"*sig_size
+ zero_sig = "\x00" * sig_size
auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
auth_level=auth_context["auth_level"],
auth_pad_length=auth_pad_length,
self.check_digest(USER_NAME,
self.netbios_domain,
USER_PASS,
- digests.hashes[1-1].hash)
+ digests.hashes[1 - 1].hash)
self.check_digest(USER_NAME.lower(),
self.netbios_domain.lower(),
USER_PASS,
- digests.hashes[2-1].hash)
+ digests.hashes[2 - 1].hash)
self.check_digest(USER_NAME.upper(),
self.netbios_domain.upper(),
USER_PASS,
- digests.hashes[3-1].hash)
+ digests.hashes[3 - 1].hash)
self.check_digest(USER_NAME,
self.netbios_domain.upper(),
USER_PASS,
- digests.hashes[4-1].hash)
+ digests.hashes[4 - 1].hash)
self.check_digest(USER_NAME,
self.netbios_domain.lower(),
USER_PASS,
- digests.hashes[5-1].hash)
+ digests.hashes[5 - 1].hash)
self.check_digest(USER_NAME.upper(),
self.netbios_domain.lower(),
USER_PASS,
- digests.hashes[6-1].hash)
+ digests.hashes[6 - 1].hash)
self.check_digest(USER_NAME.lower(),
self.netbios_domain.upper(),
USER_PASS,
- digests.hashes[7-1].hash)
+ digests.hashes[7 - 1].hash)
self.check_digest(USER_NAME,
self.dns_domain,
USER_PASS,
- digests.hashes[8-1].hash)
+ digests.hashes[8 - 1].hash)
self.check_digest(USER_NAME.lower(),
self.dns_domain.lower(),
USER_PASS,
- digests.hashes[9-1].hash)
+ digests.hashes[9 - 1].hash)
self.check_digest(USER_NAME.upper(),
self.dns_domain.upper(),
USER_PASS,
- digests.hashes[10-1].hash)
+ digests.hashes[10 - 1].hash)
self.check_digest(USER_NAME,
self.dns_domain.upper(),
USER_PASS,
- digests.hashes[11-1].hash)
+ digests.hashes[11 - 1].hash)
self.check_digest(USER_NAME,
self.dns_domain.lower(),
USER_PASS,
- digests.hashes[12-1].hash)
+ digests.hashes[12 - 1].hash)
self.check_digest(USER_NAME.upper(),
self.dns_domain.lower(),
USER_PASS,
- digests.hashes[13-1].hash)
+ digests.hashes[13 - 1].hash)
self.check_digest(USER_NAME.lower(),
self.dns_domain.upper(),
USER_PASS,
- digests.hashes[14-1].hash)
+ digests.hashes[14 - 1].hash)
self.check_digest(UPN,
"",
USER_PASS,
- digests.hashes[15-1].hash)
+ digests.hashes[15 - 1].hash)
self.check_digest(UPN.lower(),
"",
USER_PASS,
- digests.hashes[16-1].hash)
+ digests.hashes[16 - 1].hash)
self.check_digest(UPN.upper(),
"",
USER_PASS,
- digests.hashes[17-1].hash)
+ digests.hashes[17 - 1].hash)
name = "%s\\%s" % (self.netbios_domain, USER_NAME)
self.check_digest(name,
"",
USER_PASS,
- digests.hashes[18-1].hash)
+ digests.hashes[18 - 1].hash)
name = "%s\\%s" % (self.netbios_domain.lower(), USER_NAME.lower())
self.check_digest(name,
"",
USER_PASS,
- digests.hashes[19-1].hash)
+ digests.hashes[19 - 1].hash)
name = "%s\\%s" % (self.netbios_domain.upper(), USER_NAME.upper())
self.check_digest(name,
"",
USER_PASS,
- digests.hashes[20-1].hash)
+ digests.hashes[20 - 1].hash)
self.check_digest(USER_NAME,
"Digest",
USER_PASS,
- digests.hashes[21-1].hash)
+ digests.hashes[21 - 1].hash)
self.check_digest(USER_NAME.lower(),
"Digest",
USER_PASS,
- digests.hashes[22-1].hash)
+ digests.hashes[22 - 1].hash)
self.check_digest(USER_NAME.upper(),
"Digest",
USER_PASS,
- digests.hashes[23-1].hash)
+ digests.hashes[23 - 1].hash)
self.check_digest(UPN,
"Digest",
USER_PASS,
- digests.hashes[24-1].hash)
+ digests.hashes[24 - 1].hash)
self.check_digest(UPN.lower(),
"Digest",
USER_PASS,
- digests.hashes[25-1].hash)
+ digests.hashes[25 - 1].hash)
self.check_digest(UPN.upper(),
"Digest",
USER_PASS,
- digests.hashes[26-1].hash)
+ digests.hashes[26 - 1].hash)
name = "%s\\%s" % (self.netbios_domain, USER_NAME)
self.check_digest(name,
"Digest",
USER_PASS,
- digests.hashes[27-1].hash)
+ digests.hashes[27 - 1].hash)
name = "%s\\%s" % (self.netbios_domain.lower(), USER_NAME.lower())
self.check_digest(name,
"Digest",
USER_PASS,
- digests.hashes[28-1].hash)
+ digests.hashes[28 - 1].hash)
name = "%s\\%s" % (self.netbios_domain.upper(), USER_NAME.upper())
self.check_digest(name,
"Digest",
USER_PASS,
- digests.hashes[29-1].hash)
+ digests.hashes[29 - 1].hash)
def checkUserPassword(self, up, expected):
nwrap_winbind_active = (nwrap_module_so_path != "" and
nwrap_module_fn_prefix == "winbind")
- LA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_ADMINISTRATOR))
+ LA_sid = security.dom_sid(str(domsid) + "-" + str(security.DOMAIN_RID_ADMINISTRATOR))
BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
SO_sid = security.dom_sid(security.SID_BUILTIN_SERVER_OPERATORS)
SY_sid = security.dom_sid(security.SID_NT_SYSTEM)
self.assertEquals(facl.as_sddl(domsid),acl)
posix_acl = smbd.get_sys_acl(self.tempdir, smb_acl.SMB_ACL_TYPE_ACCESS)
- LA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_ADMINISTRATOR))
+ LA_sid = security.dom_sid(str(domsid) + "-" + str(security.DOMAIN_RID_ADMINISTRATOR))
BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
SO_sid = security.dom_sid(security.SID_BUILTIN_SERVER_OPERATORS)
SY_sid = security.dom_sid(security.SID_NT_SYSTEM)
self.assertEquals(facl.as_sddl(domsid),acl)
posix_acl = smbd.get_sys_acl(self.tempdir, smb_acl.SMB_ACL_TYPE_ACCESS)
- LA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_ADMINISTRATOR))
+ LA_sid = security.dom_sid(str(domsid) + "-" + str(security.DOMAIN_RID_ADMINISTRATOR))
BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
SO_sid = security.dom_sid(security.SID_BUILTIN_SERVER_OPERATORS)
SY_sid = security.dom_sid(security.SID_NT_SYSTEM)
AU_sid = security.dom_sid(security.SID_NT_AUTHENTICATED_USERS)
- PA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_POLICY_ADMINS))
+ PA_sid = security.dom_sid(str(domsid) + "-" + str(security.DOMAIN_RID_POLICY_ADMINS))
s4_passdb = passdb.PDB(self.lp.get("passdb backend"))
nwrap_winbind_active = (nwrap_module_so_path != "" and
nwrap_module_fn_prefix == "winbind")
- LA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_ADMINISTRATOR))
+ LA_sid = security.dom_sid(str(domsid) + "-" + str(security.DOMAIN_RID_ADMINISTRATOR))
BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
SO_sid = security.dom_sid(security.SID_BUILTIN_SERVER_OPERATORS)
SY_sid = security.dom_sid(security.SID_NT_SYSTEM)
AU_sid = security.dom_sid(security.SID_NT_AUTHENTICATED_USERS)
- PA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_POLICY_ADMINS))
+ PA_sid = security.dom_sid(str(domsid) + "-" + str(security.DOMAIN_RID_POLICY_ADMINS))
s4_passdb = passdb.PDB(self.lp.get("passdb backend"))
newpass = samba.generate_random_password(PWD_LEN, PWD_LEN)
encoded = newpass.encode('utf-16-le')
pwd_len = len(encoded)
- filler = [ord(x) for x in os.urandom(DATA_LEN-pwd_len)]
+ filler = [ord(x) for x in os.urandom(DATA_LEN - pwd_len)]
pwd = netlogon.netr_CryptPassword()
pwd.length = pwd_len
pwd.data = filler + [ord(x) for x in encoded]
good_dns = ["SAMDOM.EXAMPLE.COM",
"1.EXAMPLE.COM",
- "%sEXAMPLE.COM" % ("1."*100),
+ "%sEXAMPLE.COM" % ("1." * 100),
"EXAMPLE",
"\n.COM",
"!@#$%^&*()_",
def test_ntvfs(self):
path = os.environ['SELFTEST_PREFIX']
- tempf = os.path.join(path,"pytests"+str(int(100000*random.random())))
+ tempf = os.path.join(path,"pytests" +str(int(100000 * random.random())))
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
def test_s3fs(self):
path = os.environ['SELFTEST_PREFIX']
- tempf = os.path.join(path,"pytests"+str(int(100000*random.random())))
+ tempf = os.path.join(path,"pytests" +str(int(100000 * random.random())))
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
def test_ntvfs_check(self):
path = os.environ['SELFTEST_PREFIX']
- tempf = os.path.join(path,"pytests"+str(int(100000*random.random())))
+ tempf = os.path.join(path,"pytests" +str(int(100000 * random.random())))
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-ntvfs", "--as-sddl")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
- self.assertEquals(self.acl+"\n", out, "Output should be the ACL")
+ self.assertEquals(self.acl + "\n", out, "Output should be the ACL")
def test_s3fs_check(self):
path = os.environ['SELFTEST_PREFIX']
- tempf = os.path.join(path,"pytests"+str(int(100000*random.random())))
+ tempf = os.path.join(path,"pytests" + str(int(100000 *random.random())))
open(tempf, 'w').write("empty")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-s3fs", "--as-sddl")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
- self.assertEquals(self.acl+"\n", out,"Output should be the ACL")
+ self.assertEquals(self.acl + "\n", out,"Output should be the ACL")
# Note: Does not correctly handle values spanning multiple lines,
# which is acceptable for it's usage in these tests.
def _get_attribute(out, name):
- p = re.compile("^"+name+":\s+(\S+)")
+ p = re.compile("^" + name + ":\s+(\S+)")
for line in out.split("\n"):
m = p.match(line)
if m:
PY3 = sys.version_info[0] == 3
addom = 'addom.samba.example.com/'
-test_contents = 'abcd'*256
-utf_contents = u'Süßigkeiten Äpfel '*128
-test_literal_bytes_embed_nulls = b'\xff\xfe\x14\x61\x00\x00\x62\x63\x64'*256
+test_contents = 'abcd' * 256
+utf_contents = u'Süßigkeiten Äpfel ' * 128
+test_literal_bytes_embed_nulls = b'\xff\xfe\x14\x61\x00\x00\x62\x63\x64' * 256
binary_contents = b'\xff\xfe'
-binary_contents = binary_contents + "Hello cruel world of python3".encode('utf8')*128
+binary_contents = binary_contents + "Hello cruel world of python3".encode('utf8') * 128
test_dir = os.path.join(addom, 'testing_%d' % random.randint(0,0xFFFF))
test_file = os.path.join(test_dir, 'testing').replace('/', '\\')
('longstring ' * 100 + 'a', 'longstring ' * 100, 'longstring ' * 100 + 'a'),
(KATAKANA_LETTER_A, KATAKANA_LETTER_A + 'bcd', None),
(KATAKANA_LETTER_A + 'bcde', KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcde'),
- ('d'+KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcd'),
- ('d'+KATAKANA_LETTER_A + 'bd', KATAKANA_LETTER_A + 'bcd', None),
+ ('d' +KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcd'),
+ ('d' +KATAKANA_LETTER_A + 'bd', KATAKANA_LETTER_A + 'bcd', None),
- ('e'+KATAKANA_LETTER_A + 'bcdf', KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcdf'),
+ ('e' + KATAKANA_LETTER_A + 'bcdf', KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcdf'),
(KATAKANA_LETTER_A, KATAKANA_LETTER_A + 'bcd', None),
- (KATAKANA_LETTER_A*3, 'a', None),
+ (KATAKANA_LETTER_A * 3, 'a', None),
]
for a, b, expect in cases:
if expect is not None:
def _tmpfilename(self):
random.seed()
path = os.environ['SELFTEST_PREFIX']
- return os.path.join(path, "pytests"+str(int(100000*random.random())))
+ return os.path.join(path, "pytests" +str(int(100000 * random.random())))
def _eadbpath(self):
return os.path.join(os.environ['SELFTEST_PREFIX'], "eadb.tdb")
tab1 = p.split(str(x))
tab2 = p.split(str(y))
minimum = min(len(tab1), len(tab2))
- len1 = len(tab1)-1
- len2 = len(tab2)-1
+ len1 = len(tab1) -1
+ len2 = len(tab2) -1
# Note: python range go up to upper limit but do not include it
for i in range(0, minimum):
- ret = cmp_fn(tab1[len1-i], tab2[len2-i])
+ ret = cmp_fn(tab1[len1 - i], tab2[len2 - i])
if ret != 0:
return ret
else:
- if i == minimum-1:
- assert len1 != len2,"PB PB PB" + " ".join(tab1)+" / " + " ".join(tab2)
+ if i == minimum - 1:
+ assert len1 != len2,"PB PB PB" + " ".join(tab1) + " / " + " ".join(tab2)
if len1 > len2:
return 1
else:
for k in sorted_keys:
obj = hash_ts[k]
if obj["num"] > limit_print:
- dt = _glue.nttime2string(_glue.unix2nttime(k*60))
+ dt = _glue.nttime2string(_glue.unix2nttime(k * 60))
print("%s # of modification: %d \tmin: %d max: %d" % (dt, obj["num"],
obj["min"],
obj["max"]))
for i in range(0, len(kept_record)):
if i != 0:
key1 = kept_record[i]
- key2 = kept_record[i-1]
+ key2 = kept_record[i - 1]
if key1 - key2 == 1:
# previous record is just 1 minute away from current
if int(hash_ts[key1]["min"]) == int(hash_ts[key2]["max"]) + 1:
if num_lines < 50:
# Also include stderr (compile failures) if < 50 lines of stdout
f = open("%s/%s.stderr" % (gitroot, failed_tag), 'r')
- log_tail += "".join(f.readlines()[-(50-num_lines):])
+ log_tail += "".join(f.readlines()[-(50 - num_lines):])
text += '''
The last 50 lines of log messages:
for suite in self.suitesfailed:
f.write("== %s ==\n" % suite)
if suite in self.test_output:
- f.write(self.test_output[suite]+"\n\n")
+ f.write(self.test_output[suite] + "\n\n")
f.write("\n")
else:
ldaphost = host
start = host.rindex("://")
- host = host.lstrip(start+3)
+ host = host.lstrip(start + 3)
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
# user current time in ms to make unique objects
import time
- marker = str(int(round(time.time()*1000)))
+ marker = str(int(round(time.time() * 1000)))
usr1_name = "u_" + marker
usr2_name = "u2_" + marker
grp_name = "g1_" + marker
else:
ldaphost = host
start = host.rindex("://")
- host = host.lstrip(start+3)
+ host = host.lstrip(start + 3)
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
scope=SCOPE_BASE, attrs=["objectClass"])
self.assertTrue(len(res) == 1)
self.assertEquals(res[0]["objectClass"][0], "top")
- self.assertEquals(res[0]["objectClass"][len(res[0]["objectClass"])-1], "user")
+ self.assertEquals(res[0]["objectClass"][len(res[0]["objectClass"]) - 1], "user")
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
res = self.ldb.search(base=self.base_dn, scope=SCOPE_BASE, attrs=["subSchemaSubEntry"])
self.assertEquals(len(res), 1)
- self.assertEquals(res[0]["subSchemaSubEntry"][0], "CN=Aggregate,"+self.schema_dn)
+ self.assertEquals(res[0]["subSchemaSubEntry"][0], "CN=Aggregate," + self.schema_dn)
res = self.ldb.search(base=self.base_dn, scope=SCOPE_BASE, attrs=["*"])
self.assertEquals(len(res), 1)
def test_generated_schema(self):
"""Testing we can read the generated schema via LDAP"""
- res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
+ res = self.ldb.search("cn=aggregate," + self.schema_dn, scope=SCOPE_BASE,
attrs=["objectClasses", "attributeTypes", "dITContentRules"])
self.assertEquals(len(res), 1)
self.assertTrue("dITContentRules" in res[0])
def test_generated_schema_is_operational(self):
"""Testing we don't get the generated schema via LDAP by default"""
# Must keep the "*" form
- res = self.ldb.search("cn=aggregate,"+self.schema_dn, scope=SCOPE_BASE,
+ res = self.ldb.search("cn=aggregate," + self.schema_dn, scope=SCOPE_BASE,
attrs=["*"])
self.assertEquals(len(res), 1)
self.assertFalse("dITContentRules" in res[0])
adminDisplayName: """ + attr_name_3 + """
cn: """ + attr_name_3 + """
attributeId: """ + attributeID_3 + """
-linkID: """ + str(linkID_1+1) + """
+linkID: """ + str(linkID_1 + 1) + """
attributeSyntax: 2.5.5.1
ldapDisplayName: """ + attr_ldap_display_name_3 + """
omSyntax: 127
use_kerberos = creds.get_kerberos_state()
fail_creds = self.insta_creds(self.template_creds,
username=username,
- userpass=userpass+"X",
+ userpass=userpass + "X",
kerberos_state=use_kerberos)
self._check_account_initial(userdn)
use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
fail_creds = self.insta_creds(self.template_creds,
username=username,
- userpass=userpass+"X",
+ userpass=userpass + "X",
kerberos_state=use_kerberos)
try:
use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
fail_creds = self.insta_creds(self.template_creds,
username=username,
- userpass=userpass+"X",
+ userpass=userpass + "X",
kerberos_state=use_kerberos)
try:
use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
fail_creds = self.insta_creds(self.template_creds,
username=username,
- userpass=userpass+"X",
+ userpass=userpass + "X",
kerberos_state=use_kerberos)
try:
use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
fail_creds = self.insta_creds(self.template_creds,
username=username,
- userpass=userpass+"X",
+ userpass=userpass + "X",
kerberos_state=use_kerberos)
try:
chr(i & 255),
i),
"displayNamePrintable": "%d\x00%c" % (i, i & 255),
- "adminDisplayName": "%d\x00b" % (n-i),
+ "adminDisplayName": "%d\x00b" % (n - i),
"title": "%d%sb" % (n - i, '\x00' * i),
# Names that vary only in case. Windows returns
# urgent replication should be enabled when modifying userAccountControl
m = Message()
m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
- m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT+dsdb.UF_DONT_EXPIRE_PASSWD), FLAG_MOD_REPLACE,
+ m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT + dsdb.UF_DONT_EXPIRE_PASSWD), FLAG_MOD_REPLACE,
"userAccountControl")
self.ldb.modify(m)
res = self.ldb.load_partition_usn(self.base_dn)
else:
ldaphost = host
start = host.rindex("://")
- host = host.lstrip(start+3)
+ host = host.lstrip(start + 3)
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
"audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
chr(i & 255), i),
"displayNamePrintable": "%d\x00%c" % (i, i & 255),
- "adminDisplayName": "%d\x00b" % (n-i),
+ "adminDisplayName": "%d\x00b" % (n - i),
"title": "%d%sb" % (n - i, '\x00' * i),
"comment": "Favourite colour is %d" % (n % (i + 1)),
store_utdv_len, store_utdv_ofs) = \
struct.unpack("<LLLLLLL", store_hdr)
- store_dn = store_blob[store_dn_ofs:store_dn_ofs+store_dn_len]
- store_hwm_blob = store_blob[store_hwm_ofs:store_hwm_ofs+store_hwm_len]
- store_utdv_blob = store_blob[store_utdv_ofs:store_utdv_ofs+store_utdv_len]
+ store_dn = store_blob[store_dn_ofs:store_dn_ofs + store_dn_len]
+ store_hwm_blob = store_blob[store_hwm_ofs:store_hwm_ofs + store_hwm_len]
+ store_utdv_blob = store_blob[store_utdv_ofs:store_utdv_ofs + store_utdv_len]
store_hwm = ndr_unpack(drsuapi.DsReplicaHighWaterMark, store_hwm_blob)
store_utdv = ndr_unpack(drsblobs.replUpToDateVectorBlob, store_utdv_blob)
def create_bundle(self, count):
for i in range(count):
- self.create_user("cn=speedtestuser%d,cn=Users,%s" % (i+1, self.base_dn))
+ self.create_user("cn=speedtestuser%d,cn=Users,%s" % (i + 1, self.base_dn))
def remove_bundle(self, count):
for i in range(count):
- delete_force(self.ldb_admin, "cn=speedtestuser%d,cn=Users,%s" % (i+1, self.base_dn))
+ delete_force(self.ldb_admin, "cn=speedtestuser%d,cn=Users,%s" % (i + 1, self.base_dn))
def remove_test_users(self):
res = ldb.search(base="cn=Users,%s" % self.base_dn, expression="(objectClass=user)", scope=SCOPE_SUBTREE)
mod = "(A;;LC;;;%s)(D;;RP;;;%s)" % (str(self.user_sid), str(self.user_sid))
for i in range(num):
self.sd_utils.dacl_add_ace("cn=speedtestuser%d,cn=Users,%s" %
- (i+1, self.base_dn), mod)
+ (i + 1, self.base_dn), mod)
print("\n=== %s user objects created ===\n" % num)
print("\n=== Test search on %s user objects ===\n" % num)
avg_search = Decimal("0.0")
rodc_creds = Credentials()
rodc_creds.guess(self.rodc_ctx.lp)
- rodc_creds.set_username(self.rodc_name+'$')
+ rodc_creds.set_username(self.rodc_name + '$')
rodc_creds.set_password(self.rodc_pass)
self.rodc_creds = rodc_creds
(packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
# Change the user's password on DC1
- self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password + "1", False, user_name)
(packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
(packed_attrs_1, unpacked_attrs_1) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
# Change the user's password on DC1
- self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password+"1", False, user_name)
+ self.ldb_dc1.setpassword("(sAMAccountName=%s)" % user_name, password + "1", False, user_name)
(packed_attrs_2, unpacked_attrs_2) = self._assert_in_revealed_users(user_dn, expected_user_attributes)
self._assert_attrlist_equals(unpacked_attrs_1, unpacked_attrs_2)
other_rodc_creds = Credentials()
other_rodc_creds.guess(other_rodc_ctx.lp)
- other_rodc_creds.set_username(other_rodc_name+'$')
+ other_rodc_creds.set_username(other_rodc_name + '$')
other_rodc_creds.set_password(self.rodc_pass)
(other_rodc_drs, other_rodc_drs_handle) = self._ds_bind(self.dnsname_dc1, other_rodc_creds)
if colon == -1:
raise RuntimeError("Invalid config line '%s'" % line)
varname = line[0:colon].strip()
- value = line[colon+1:].strip()
+ value = line[colon + 1:].strip()
self.setvar(varname, value)
def list_steps_mode(self):
var_end = text.find("}", var_start)
if var_end == -1:
return text
- var_name = text[var_start+2:var_end]
+ var_name = text[var_start + 2:var_end]
if not var_name in self.vars:
raise RuntimeError("Unknown substitution variable ${%s}" % var_name)
text = text.replace("${%s}" % var_name, self.vars[var_name])