newer_file = True
if newer_file:
- utime = None # Sets to current time.
+ utime = None # Sets to current time.
print '...Setting fake mtime to NOW. Will trigger re-index.'
else:
mtime = os.stat(media_dir)[stat.ST_MTIME]
unapply_attributes = gp_db.list(gp_extensions)
for attr in unapply_attributes:
attr_obj = attr[-1](logger, test_ldb, gp_db, lp, attr[0], attr[1])
- attr_obj.mapper()[attr[0]][0](attr[1]) # Set the old value
+ attr_obj.mapper()[attr[0]][0](attr[1]) # Set the old value
gp_db.delete(str(attr_obj), attr[0])
gp_db.commit()
'fTUPLEINDEX': 26, # TP
'fSUBTREEATTINDEX': 25, # ST
'fCONFIDENTIAL': 24, # CF
- 'fCONFIDENTAIL': 24, # typo
+ 'fCONFIDENTAIL': 24, # typo
'fNEVERVALUEAUDIT': 23, # NV
'fRODCAttribute': 22, # RO
# missing in ADTS but required by LDIF
'fRODCFilteredAttribute': 22, # RO
- 'fRODCFILTEREDATTRIBUTE': 22, # case
+ 'fRODCFILTEREDATTRIBUTE': 22, # case
'fEXTENDEDLINKTRACKING': 21, # XL
'fBASEONLY': 20, # BO
'fPARTITIONSECRET': 19, # SE
takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
metavar="URL", dest="H"),
- Option("-q", "--quiet", help="Be quiet", action="store_true"), # unused
+ Option("-q", "--quiet", help="Be quiet", action="store_true"), # unused
Option("--forest-level", type="choice", choices=["2003", "2008", "2008_R2", "2012", "2012_R2"],
help="The forest function level (2003 | 2008 | 2008_R2 | 2012 | 2012_R2)"),
Option("--domain-level", type="choice", choices=["2003", "2008", "2008_R2", "2012", "2012_R2"],
takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
metavar="URL", dest="H"),
- Option("-q", "--quiet", help="Be quiet", action="store_true"), # unused
+ Option("-q", "--quiet", help="Be quiet", action="store_true"), # unused
Option("--complexity", type="choice", choices=["on", "off", "default"],
help="The password complexity (on | off | default). Default is 'on'"),
Option("--store-plaintext", type="choice", choices=["on", "off", "default"],
takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
metavar="URL", dest="H"),
- Option("-q", "--quiet", help="Be quiet", action="store_true"), #unused
+ Option("-q", "--quiet", help="Be quiet", action="store_true"), # unused
Option("-v", "--verbose", help="Be verbose", action="store_true"),
Option("--schema", type="choice", metavar="SCHEMA",
choices=["2012", "2012_R2"],
"uSNChanged",
"uSNCreated",
"uSNLastObjRem",
- "whenChanged", # This is implicitly replicated, but may diverge on updates of non-replicated attributes
+ "whenChanged", # This is implicitly replicated, but may diverge on updates of non-replicated attributes
]
self.ignore_attributes = self.non_replicated_attributes
self.ignore_attributes += ["msExchServer1HighestUSN"]
self.summary["unique_attrs"] += self.unique_attrs
self.summary["df_value_attrs"] += self.df_value_attrs
other.summary["unique_attrs"] += other.unique_attrs
- other.summary["df_value_attrs"] += self.df_value_attrs # they are the same
+ other.summary["df_value_attrs"] += self.df_value_attrs # they are the same
#
self.screen_output = res[:-1]
other.screen_output = res[:-1]
if pid == 0:
os.setsid()
pid = os.fork()
- if pid == 0: # Actual daemon
+ if pid == 0: # Actual daemon
pid = os.getpid()
log_msg("Daemonized as pid %d (from %d)\n" % (pid, orig_pid))
load_cache()
import resource # Resource usage information.
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd == resource.RLIM_INFINITY:
- maxfd = 1024 # Rough guess at maximum number of open file descriptors.
+ maxfd = 1024 # Rough guess at maximum number of open file descriptors.
logfd = os.open(logfile, os.O_WRONLY | os.O_APPEND | os.O_CREAT, 0o600)
self.outf.write("Using logfile[%s]\n" % logfile)
for fd in range(0, maxfd):
f = open(file, 'r')
assert f.readline().rstrip("\n") == "VERSION 1 0"
for l in f.readlines():
- if l[0] == "#": # skip comments
+ if l[0] == "#": # skip comments
continue
entries = shellsplit(l.rstrip("\n"))
name = entries[0]
"""Return the entries in this WINS database."""
return self.entries.items()
- def close(self): # for consistency
+ def close(self): # for consistency
pass
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
- record.rank = 0 # DNS_RANK_NONE
+ record.rank = 0 # DNS_RANK_NONE
res = self.samdb.dns_replace_by_dn(dn, [record])
if res is not None:
self.fail("Unable to update dns record to have DNS_RANK_NONE.")
handle = self.conn.OpenHKLM(None,
winreg.KEY_QUERY_VALUE | winreg.KEY_ENUMERATE_SUB_KEYS)
x = self.conn.QueryInfoKey(handle, winreg.String())
- self.assertEquals(9, len(x)) # should return a 9-tuple
+ self.assertEquals(9, len(x)) # should return a 9-tuple
self.conn.CloseKey(handle)
share.current_users = 0x00000000
share.max_users = -1
share.password = None
- share.path = u'C:\\tmp' # some random path
+ share.path = u'C:\\tmp' # some random path
share.permissions = 123434566
return share
print("ERROR: Failed to check_type %s.%s: %r: %s" % (iname, n, e.__class__, e))
self.errcount += 1
elif callable(value):
- pass # Method
+ pass # Method
else:
print("UNKNOWN: %s=%s" % (n, value))
if self.errcount - errcount != 0:
from samba.samdb import SamDB
unix_now = int(time.time())
-unix_once_upon_a_time = 1000000000 #2001-09-09
+unix_once_upon_a_time = 1000000000 # 2001-09-09
ENV_DSAS = {
'ad_dc_ntvfs': ['CN=LOCALDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
username = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
unix_username = "%s/%s" % (domain, username)
- expected_rc = 0 # PAM_SUCCESS
+ expected_rc = 0 # PAM_SUCCESS
tc = pypamtest.TestCase(pypamtest.PAMTEST_AUTHENTICATE, expected_rc)
res = pypamtest.run_pamtest(unix_username, "samba", [tc], [password])
username = os.environ["USERNAME"]
password = "WrongPassword"
unix_username = "%s/%s" % (domain, username)
- expected_rc = 7 # PAM_AUTH_ERR
+ expected_rc = 7 # PAM_AUTH_ERR
tc = pypamtest.TestCase(pypamtest.PAMTEST_AUTHENTICATE, expected_rc)
res = pypamtest.run_pamtest(unix_username, "samba", [tc], [password])
# Authenticate again to check that we are not locked out with just one
# failed login
password = os.environ["PASSWORD"]
- expected_rc = 0 # PAM_SUCCESS
+ expected_rc = 0 # PAM_SUCCESS
tc = pypamtest.TestCase(pypamtest.PAMTEST_AUTHENTICATE, expected_rc)
res = pypamtest.run_pamtest(unix_username, "samba", [tc], [password])
password = os.environ["PASSWORD"]
warn_pwd_expire = int(os.environ["WARN_PWD_EXPIRE"])
unix_username = "%s/%s" % (domain, username)
- expected_rc = 0 # PAM_SUCCESS
+ expected_rc = 0 # PAM_SUCCESS
tc = pypamtest.TestCase(pypamtest.PAMTEST_AUTHENTICATE, expected_rc)
res = pypamtest.run_pamtest(unix_username, "samba", [tc], [password])
% (record_str, record_type_str))
dn, record = self.get_record_from_db(self.zone, "testrecord")
- record.rank = 0 # DNS_RANK_NONE
+ record.rank = 0 # DNS_RANK_NONE
res = self.samdb.dns_replace_by_dn(dn, [record])
if res is not None:
self.fail("Unable to update dns record to have DNS_RANK_NONE.")
try:
self.assertCmdFail(result)
except AssertionError:
- continue # Don't check this one, because record2 _is_ a valid entry of dnstype1.
+ continue # Don't check this one, because record2 _is_ a valid entry of dnstype1.
# Check both ways: Give the current type and try to update,
# and give the new type and try to update.
# we can't enter the wrong current value for a given record.
for dnstype in self.good_records:
if len(self.good_records[dnstype]) < 3:
- continue # Not enough records of this type to do this test
+ continue # Not enough records of this type to do this test
used_record = self.good_records[dnstype][0]
unused_record = self.good_records[dnstype][1]
def test_site_subnet_create(self):
cidrs = (("10.9.8.0/24", self.sitename),
("50.60.0.0/16", self.sitename2),
- ("50.61.0.0/16", self.sitename2), # second subnet on the site
- ("50.0.0.0/8", self.sitename), # overlapping subnet, other site
- ("50.62.1.2/32", self.sitename), # single IP
+ ("50.61.0.0/16", self.sitename2), # second subnet on the site
+ ("50.0.0.0/8", self.sitename), # overlapping subnet, other site
+ ("50.62.1.2/32", self.sitename), # single IP
("aaaa:bbbb:cccc:dddd:eeee:ffff:2222:1100/120",
self.sitename2),
)
def test_site_subnet_create_should_fail(self):
cidrs = (("10.9.8.0/33", self.sitename), # mask too big
("50.60.0.0/8", self.sitename2), # insufficient zeros
- ("50.261.0.0/16", self.sitename2), # bad octet
+ ("50.261.0.0/16", self.sitename2), # bad octet
("7.0.0.0.0/0", self.sitename), # insufficient zeros
("aaaa:bbbb:cccc:dddd:eeee:ffff:2222:1100/119",
self.sitename), # insufficient zeros
rType = 0x0
if ttl > time.time():
- rState = 0x0 # active
+ rState = 0x0 # active
else:
- rState = 0x1 # released
+ rState = 0x1 # released
nType = ((nb_flags & 0x60) >> 5)
def daemonize(logfile):
pid = os.fork()
- if pid == 0: # Parent
+ if pid == 0: # Parent
os.setsid()
pid = os.fork()
- if pid != 0: # Actual daemon
+ if pid != 0: # Actual daemon
os._exit(0)
- else: # Grandparent
+ else: # Grandparent
os._exit(0)
import resource # Resource usage information.
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd == resource.RLIM_INFINITY:
- maxfd = 1024 # Rough guess at maximum number of open file descriptors.
+ maxfd = 1024 # Rough guess at maximum number of open file descriptors.
for fd in range(0, maxfd):
try:
os.close(fd)
class FilterOps(unittest.TestResult):
def control_msg(self, msg):
- pass # We regenerate control messages, so ignore this
+ pass # We regenerate control messages, so ignore this
def time(self, time):
self._ops.time(time)
creds_tmp.set_workstation(creds.get_workstation())
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
| gensec.FEATURE_SEAL)
- creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
+ creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
return ldb_target
def _test_complex_search(self, n=100):
classes = ['samaccountname', 'objectCategory', 'dn', 'member']
values = ['*', '*t*', 'g*', 'user']
- comparators = ['=', '<=', '>='] # '~=' causes error
+ comparators = ['=', '<=', '>='] # '~=' causes error
maybe_not = ['!(', '']
joiners = ['&', '|']
def _test_complex_search(self):
classes = ['samaccountname', 'objectCategory', 'dn', 'member']
values = ['*', '*t*', 'g*', 'user']
- comparators = ['=', '<=', '>='] # '~=' causes error
+ comparators = ['=', '<=', '>='] # '~=' causes error
maybe_not = ['!(', '']
joiners = ['&', '|']
creds_tmp.set_workstation(creds.get_workstation())
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
| gensec.FEATURE_SEAL)
- creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
+ creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
return ldb_target
self.assertFalse("description" in res[0])
self.assertTrue("objectGUID" in res[0])
self.assertTrue("uSNCreated" in res[0])
- self.assertFalse(res[0]["uSNCreated"][0] == "1") # these are corrected
+ self.assertFalse(res[0]["uSNCreated"][0] == "1") # these are corrected
self.assertTrue("uSNChanged" in res[0])
- self.assertFalse(res[0]["uSNChanged"][0] == "1") # these are corrected
+ self.assertFalse(res[0]["uSNChanged"][0] == "1") # these are corrected
self.assertTrue("whenCreated" in res[0])
self.assertTrue("whenChanged" in res[0])
except LdbError as e35:
(num, _) = e35.args
self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
- num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
+ num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
delete_force(self.ldb, "cn=testuser2,cn=users," + self.base_dn)
except LdbError as e39:
(num, _) = e39.args
self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
- num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
+ num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
m = Message()
m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
except LdbError as e43:
(num, _) = e43.args
self.assertTrue(num == ERR_UNWILLING_TO_PERFORM or
- num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
+ num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
m = Message()
m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
except LdbError as e47:
(num, _) = e47.args
self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
- num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
+ num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
def test_plain_userPassword(self):
print("Performs testing about the standard 'userPassword' behaviour")
# Delete the "dSHeuristics"
self.ldb.set_dsheuristics(None)
- time.sleep(1) # This switching time is strictly needed!
+ time.sleep(1) # This switching time is strictly needed!
m = Message()
m.dn = Dn(self.ldb, "cn=testuser,cn=users," + self.base_dn)
""")
except LdbError, (num, msg):
self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
- num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
+ num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
else:
self.fail()
set_auto_replication(RWDC, True)
def setUp(self):
- self.kerberos = False # To be set later
+ self.kerberos = False # To be set later
self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
session_info=system_session(LP), lp=LP)
creds_tmp.set_workstation(creds.get_workstation())
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
| gensec.FEATURE_SEAL)
- creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
+ creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
return ldb_target
def check_modify_inheritance(self, _ldb, object_dn, owner_group=""):
# Modify
sd_user_utils = sd_utils.SDUtils(_ldb)
- ace = "(D;;CC;;;LG)" # Deny Create Children to Guest account
+ ace = "(D;;CC;;;LG)" # Deny Create Children to Guest account
if owner_group != "":
sd_user_utils.modify_sd_on_dn(object_dn, owner_group + "D:" + ace)
else:
# Make sure created group object contains only the above inherited ACE
# that we've added manually
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
- mod = mod.replace(";OI;", ";OIIOID;") # change it how it's gonna look like
+ mod = mod.replace(";OI;", ";OIIOID;") # change it how it's gonna look like
self.assertTrue(mod in desc_sddl)
self.sd_utils.modify_sd_on_dn(group_dn, "D:" + moded)
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
# Make sure created group object contains only the above inherited ACE
# that we've added manually
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
- mod = mod.replace(";CI;", ";CIID;") # change it how it's gonna look like
+ mod = mod.replace(";CI;", ";CIID;") # change it how it's gonna look like
self.assertTrue(mod in desc_sddl)
self.sd_utils.modify_sd_on_dn(group_dn, "D:" + moded)
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
# Make sure created group object contains only the above inherited ACE
# that we've added manually
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
- mod = mod.replace(";OI;", ";OIIOID;") # change it how it's gonna look like
+ mod = mod.replace(";OI;", ";OIIOID;") # change it how it's gonna look like
self.assertTrue(mod in desc_sddl)
self.sd_utils.modify_sd_on_dn(group_dn, "D:" + moded)
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
# Make sure created group object contains only the above inherited ACE
# that we've added manually
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
- mod = mod.replace(";CI;", ";CIID;") # change it how it's gonna look like
+ mod = mod.replace(";CI;", ";CIID;") # change it how it's gonna look like
self.assertTrue(mod in desc_sddl)
self.sd_utils.modify_sd_on_dn(group_dn, "D:" + moded)
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
# Make sure created group object contains only the above inherited ACE
# that we've added manually
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
- mod = mod.replace(";OI;", ";OIIOID;") # change it how it's gonna look like
+ mod = mod.replace(";OI;", ";OIIOID;") # change it how it's gonna look like
self.assertTrue(mod in desc_sddl)
self.sd_utils.modify_sd_on_dn(group_dn, "D:(OA;OI;WP;bf967a39-0de6-11d0-a285-00aa003049e2;;DU)" + moded)
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
# because it uses a inet_pton / inet_ntop round trip to
# ascertain correctness.
- "::ffff:0:0/96", #this one fails on WIN2012r2
+ "::ffff:0:0/96", # this one fails on WIN2012r2
"::ffff:aaaa:a000/120",
"::ffff:10:0/120",
"::ffff:2:300/120",
creds_tmp.set_workstation(creds.get_workstation())
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
| gensec.FEATURE_SEAL)
- creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
+ creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
return creds_tmp
def setUp(self):
try:
_swig_property = property
except NameError:
- pass # Python < 2.2 doesn't have 'property'.
+ pass # Python < 2.2 doesn't have 'property'.
def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
if (name == "thisown"): return self.this.own(value)
if (name == "this"):
names = drsuapi.DsNameString()
names.str = name
- req.codepage = 1252 # German, but it doesn't really matter here
+ req.codepage = 1252 # German, but it doesn't really matter here
req.language = 1033
req.format_flags = 0
req.format_offered = format_offered
self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
except WERRORError as e:
(enum, estr) = e.args
- self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
# send the same request again and we should get the same response
try:
self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
except WERRORError as e1:
(enum, estr) = e1.args
- self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
# Retry with Administrator credentials, ignores password replication groups
(level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 10, req10)
self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
except WERRORError as e3:
(enum, estr) = e3.args
- self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
req10 = self._getnc_req10(dest_dsa=str(self.rodc_ctx.ntds_guid),
invocation_id=self.ldb_dc1.get_invocation_id(),
self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
except WERRORError as e4:
(enum, estr) = e4.args
- self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
def test_msDSRevealedUsers_local_deny_allow(self):
"""
self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
except WERRORError as e5:
(enum, estr) = e5.args
- self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
m = ldb.Message()
m.dn = ldb.Dn(self.ldb_dc1, self.computer_dn)
self.fail("Successfully replicated secrets to an RODC that shouldn't have been replicated.")
except WERRORError as e6:
(enum, estr) = e6.args
- self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
+ self.assertEquals(enum, 8630) # ERROR_DS_DRA_SECRETS_DENIED
def _assert_in_revealed_users(self, user_dn, attrlist):
res = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=self.computer_dn,
"Last attempt .* was successful",
"CN=Configuration,${BASEDN}",
"Last attempt .* was successful",
- "CN=Configuration,${BASEDN}", # cope with either order
+ "CN=Configuration,${BASEDN}", # cope with either order
"Last attempt .* was successful",
"OUTBOUND NEIGHBORS",
"${BASEDN}",
i = child.expect(["You must restart this computer", "failed", "Active Directory Domain Services was not installed", "C:", pexpect.TIMEOUT], timeout=240)
if i == 1 or i == 2:
raise Exception("dcpromo failed")
- if i == 4: # timeout
+ if i == 4: # timeout
child = self.open_telnet("${WIN_HOSTNAME}", "administrator", "${WIN_PASS}")
child.sendline("shutdown -r -t 0")