classes_ldif = ""
if dump_attributes:
- attr_ldif = __parse_schema_file(attr_file, "attributeSchema")
+ attr_ldif = __parse_schema_file(attr_file, "attributeSchema")
if dump_classes:
classes_ldif = __parse_schema_file(classes_file, "classSchema")
uac = int(res[0].get("userAccountControl")[0])
allowed = res[0].get("msDS-AllowedToDelegateTo")
- self.outf.write("Account-DN: %s\n" % str(res[0].dn))
+ self.outf.write("Account-DN: %s\n" % str(res[0].dn))
self.outf.write("UF_TRUSTED_FOR_DELEGATION: %s\n"
% bool(uac & dsdb.UF_TRUSTED_FOR_DELEGATION))
self.outf.write("UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION: %s\n" %
enc_types.enc_types = security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
enc_types.enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
- local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
+ local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
local_policy_access |= lsa.LSA_POLICY_TRUST_ADMIN
local_policy_access |= lsa.LSA_POLICY_CREATE_SECRET
def run(self, domain, sambaopts=None, localdcopts=None, credopts=None, versionopts=None,
delete_location=None):
- local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
+ local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
local_policy_access |= lsa.LSA_POLICY_TRUST_ADMIN
local_policy_access |= lsa.LSA_POLICY_CREATE_SECRET
if delete_location == "local":
remote_policy_access = None
else:
- remote_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
+ remote_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
remote_policy_access |= lsa.LSA_POLICY_TRUST_ADMIN
remote_policy_access |= lsa.LSA_POLICY_CREATE_SECRET
def run(self, domain, sambaopts=None, versionopts=None, credopts=None, localdcopts=None,
validate_location=None):
- local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
+ local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
local_server = self.setup_local_server(sambaopts, localdcopts)
try:
continue
raise CommandError("value[%s] specified for --enable-sid and --disable-sid" % e)
- local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
+ local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
if require_update:
local_policy_access |= lsa.LSA_POLICY_TRUST_ADMIN
# Two domains - two domain controllers
if self.two_domains:
- self.ignore_attributes += [
+ self.ignore_attributes += [
"objectCategory", "objectGUID", "objectSid", "whenCreated",
"whenChanged", "pwdLastSet", "uSNCreated", "creationTime",
"modifiedCount", "priorSetTime", "rIDManagerReference",
if self.current_pid is not None:
log_msg("currentPid: %d\n" % self.current_pid)
- modify_ldif = "dn: %s\n" % (self.cache_dn)
+ modify_ldif = "dn: %s\n" % (self.cache_dn)
modify_ldif += "changetype: modify\n"
modify_ldif += "replace: currentPid\n"
if self.current_pid is not None:
self.dirsync_controls = [str(res_controls[0]),"extended_dn:1:0"]
log_msg("dirsyncControls: %r\n" % self.dirsync_controls)
- modify_ldif = "dn: %s\n" % (self.cache_dn)
+ modify_ldif = "dn: %s\n" % (self.cache_dn)
modify_ldif += "changetype: modify\n"
modify_ldif += "replace: dirsyncControl\n"
modify_ldif += "dirsyncControl: %s\n" % (self.dirsync_controls[0])
add_ldif += "currentTime: %s\n" % ldb.timestring(int(time.time()))
self.cache.add_ldif(add_ldif)
else:
- modify_ldif = "dn: %s\n" % (dn)
+ modify_ldif = "dn: %s\n" % (dn)
modify_ldif += "changetype: modify\n"
modify_ldif += "replace: lastCookie\n"
modify_ldif += "lastCookie: %s\n" % (lastCookie)
if str(ace.trustee) == security.SID_CREATOR_OWNER:
# For Creator/Owner the IO flag is set as this ACE has only a sense for child objects
ace.flags = ace.flags | security.SEC_ACE_FLAG_INHERIT_ONLY
- ace.access_mask = ldapmask2filemask(ace.access_mask)
+ ace.access_mask = ldapmask2filemask(ace.access_mask)
fdescr.dacl_add(ace)
if not as_sddl:
count = count + 1
if count > 15:
- self.logger.error("Could not connect to slapd started with: %s" % "\'" + "\' \'".join(self.slapd_provision_command) + "\'")
+ self.logger.error("Could not connect to slapd started with: %s" % "\'" + "\' \'".join(self.slapd_provision_command) + "\'")
raise ProvisioningError("slapd never accepted a connection within 15 seconds of starting")
self.logger.error("Could not start slapd with: %s" % "\'" + "\' \'".join(self.slapd_provision_command) + "\'")
retcode = subprocess.call(slapd_cmd, close_fds=True, shell=False)
if retcode != 0:
- self.logger.error("conversion from slapd.conf to cn=config failed slapd started with: %s" % "\'" + "\' \'".join(slapd_cmd) + "\'")
+ self.logger.error("conversion from slapd.conf to cn=config failed slapd started with: %s" % "\'" + "\' \'".join(slapd_cmd) + "\'")
raise ProvisioningError("conversion from slapd.conf to cn=config failed")
if not os.path.exists(os.path.join(self.olcdir, "cn=config.ldif")):
def get_domainguid(samdb, domaindn):
res = samdb.search(base=domaindn, scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
- domainguid = str(ndr_unpack(misc.GUID, res[0]["objectGUID"][0]))
+ domainguid = str(ndr_unpack(misc.GUID, res[0]["objectGUID"][0]))
return domainguid
features1 = 0
btf1 = base.bind_time_features_syntax(features1)
- features2 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
+ features2 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
features2 |= dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING
btf2 = base.bind_time_features_syntax(features2)
else:
auth_info=""
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
if object is not None:
pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_OBJECT_UUID
super(NetBiosTests, self).setUp()
self.n = netbios.Node()
self.ifc = os.environ["SERVER_IP"]
- self.dc = os.environ["DC_NETBIOSNAME"]
+ self.dc = os.environ["DC_NETBIOSNAME"]
def tearDown(self):
super(NetBiosTests, self).tearDown()
def test_query_name(self):
# test adding groups
for group in self.groups:
- (result, out, err) = self.runsubcmd("group", "add", group["name"],
+ (result, out, err) = self.runsubcmd("group", "add", group["name"],
"--description=%s" % group["description"],
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
def test_move(self):
full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
- (result, out, err) = self.runsubcmd("ou", "create", full_ou_dn)
+ (result, out, err) = self.runsubcmd("ou", "create", full_ou_dn)
self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "There shouldn't be any error message")
self.assertIn('Created ou "%s"' % full_ou_dn, out)
(group["name"], full_ou_dn), out)
# Should fail as groups objects are in OU
- (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
+ (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
self.assertCmdFail(result)
self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
"(it has %d children)!") % len(self.groups), err)
def test_ntvfs(self):
- (result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
+ (result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-ntvfs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
def test_s3fs(self):
- (result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
+ (result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-s3fs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
def test_ntvfs_check(self):
- (result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
+ (result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-ntvfs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
# Now check they were set correctly
- (result, out, err) = self.runsubcmd("ntacl", "sysvolcheck")
+ (result, out, err) = self.runsubcmd("ntacl", "sysvolcheck")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertEquals(out,"","Shouldn't be any output messages")
def test_s3fs_check(self):
- (result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
+ (result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-s3fs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
# Now check they were set correctly
- (result, out, err) = self.runsubcmd("ntacl", "sysvolcheck")
+ (result, out, err) = self.runsubcmd("ntacl", "sysvolcheck")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertEquals(out,"","Shouldn't be any output messages")
tempf = os.path.join(path,"pytests"+str(int(100000*random.random())))
open(tempf, 'w').write("empty")
- (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
+ (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-ntvfs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
tempf = os.path.join(path,"pytests"+str(int(100000*random.random())))
open(tempf, 'w').write("empty")
- (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
+ (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-s3fs")
self.assertCmdSuccess(result, out, err)
tempf = os.path.join(path,"pytests"+str(int(100000*random.random())))
open(tempf, 'w').write("empty")
- (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
+ (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-ntvfs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
# Now check they were set correctly
- (result, out, err) = self.runsubcmd("ntacl", "get", tempf,
+ (result, out, err) = self.runsubcmd("ntacl", "get", tempf,
"--use-ntvfs", "--as-sddl")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
tempf = os.path.join(path,"pytests"+str(int(100000*random.random())))
open(tempf, 'w').write("empty")
- (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
+ (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-s3fs")
self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
self.assertEquals(err,"","Shouldn't be any error messages")
# Now check they were set correctly
- (result, out, err) = self.runsubcmd("ntacl", "get", tempf,
+ (result, out, err) = self.runsubcmd("ntacl", "get", tempf,
"--use-s3fs", "--as-sddl")
self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
# test adding users with --use-username-as-cn
for user in self.users:
- (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
+ (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
"--use-username-as-cn",
"--surname=%s" % user["surname"],
"--given-name=%s" % user["given-name"],
def test_move(self):
full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
- (result, out, err) = self.runsubcmd("ou", "create", full_ou_dn)
+ (result, out, err) = self.runsubcmd("ou", "create", full_ou_dn)
self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "There shouldn't be any error message")
self.assertIn('Created ou "%s"' % full_ou_dn, out)
(user["name"], full_ou_dn), out)
# Should fail as users objects are in OU
- (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
+ (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
self.assertCmdFail(result)
self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
"(it has %d children)!") % len(self.users), err)
machinepass = None
if samba3.lp.get("passdb backend").split(":")[0].strip() == "ldapsam":
- base_dn = samba3.lp.get("ldap suffix")
+ base_dn = samba3.lp.get("ldap suffix")
ldapuser = samba3.lp.get("ldap admin dn")
ldappass = secrets_db.get_ldap_bind_pw(ldapuser)
if ldappass is None:
# And so opening them create a file in the current directory which is not what
# we want
# I still keep them commented because I plan soon to make more cleaner
-ERROR = -1
-SIMPLE = 0x00
-CHANGE = 0x01
-CHANGESD = 0x02
-GUESS = 0x04
-PROVISION = 0x08
-CHANGEALL = 0xff
+ERROR = -1
+SIMPLE = 0x00
+CHANGE = 0x01
+CHANGESD = 0x02
+GUESS = 0x04
+PROVISION = 0x08
+CHANGEALL = 0xff
hashAttrNotCopied = set(["dn", "whenCreated", "whenChanged", "objectGUID",
"uSNCreated", "replPropertyMetaData", "uSNChanged", "parentGUID",
cont = True
ok = False
while cont:
- if idx == len(range):
+ if idx == len(range):
cont = False
continue
if usn < int(range[idx]):
ctdb_configure_params = " --enable-developer --picky-developer ${PREFIX}"
samba_configure_params = " --picky-developer ${PREFIX} ${EXTRA_PYTHON} --with-profiling-data"
-samba_libs_envvars = "PYTHONPATH=${PYTHON_PREFIX}/site-packages:$PYTHONPATH"
+samba_libs_envvars = "PYTHONPATH=${PYTHON_PREFIX}/site-packages:$PYTHONPATH"
samba_libs_envvars += " PKG_CONFIG_PATH=$PKG_CONFIG_PATH:${PREFIX_DIR}/lib/pkgconfig"
samba_libs_envvars += " ADDITIONAL_CFLAGS='-Wmissing-prototypes'"
samba_libs_configure_base = samba_libs_envvars + " ./configure --abi-check --enable-debug --picky-developer -C ${PREFIX}"
flush_cache(sids=sids)
-sids2xids = subprocess.Popen([wbinfo, '--sids-to-unix-ids=' + ','.join(sids)],
+sids2xids = subprocess.Popen([wbinfo, '--sids-to-unix-ids=' + ','.join(sids)],
stdout=subprocess.PIPE).communicate()[0].strip()
gids=[]
# Check the list produced by the sids-to-xids call with the
# multiple variant (sid-to-xid) for each sid in turn.
def check_multiple(sids, idtypes):
- sids2xids = subprocess.Popen([wbinfo, '--sids-to-unix-ids=' + ','.join(sids)],
+ sids2xids = subprocess.Popen([wbinfo, '--sids-to-unix-ids=' + ','.join(sids)],
stdout=subprocess.PIPE).communicate()[0].strip()
# print sids2xids
i = 0
elif t == "raw.chkpath":
plansmbtorture4testsuite(t, "nt4_dc", '//$SERVER_IP/tmpcase -U$USERNAME%$PASSWORD')
plansmbtorture4testsuite(t, "ad_dc", '//$SERVER_IP/tmpcase -U$USERNAME%$PASSWORD')
- elif t == "raw.samba3hide" or t == "raw.samba3checkfsp" or t == "raw.samba3closeerr":
+ elif t == "raw.samba3hide" or t == "raw.samba3checkfsp" or t == "raw.samba3closeerr":
plansmbtorture4testsuite(t, "nt4_dc", '//$SERVER_IP/tmp -U$USERNAME%$PASSWORD')
plansmbtorture4testsuite(t, "simpleserver", '//$SERVER_IP/tmp -U$USERNAME%$PASSWORD')
plansmbtorture4testsuite(t, "ad_dc", '//$SERVER/tmp -U$USERNAME%$PASSWORD')
dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
changetype: modify
add: Member
-Member: """ + self.get_user_dn(self.user_with_sm)
+Member: """ + self.get_user_dn(self.user_with_sm)
#the user has no rights granted, this should fail
try:
self.ldb_user2.modify_ldif(ldif)
dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
changetype: modify
add: Member
-Member: """ + self.get_user_dn(self.user_with_sm) + """
+Member: """ + self.get_user_dn(self.user_with_sm) + """
Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
#grant self-membership, should be able to add himself but not others at the same time
dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
changetype: modify
add: Member
-Member: """ + self.get_user_dn(self.user_with_wp)
+Member: """ + self.get_user_dn(self.user_with_wp)
self.ldb_user.modify_ldif(ldif)
res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
% ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
ctx.secure_channel_type = misc.SEC_CHAN_RODC
ctx.RODC = True
- ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
- drsuapi.DRSUAPI_DRS_PER_SYNC |
- drsuapi.DRSUAPI_DRS_GET_ANC |
- drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
- drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+ ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
+ drsuapi.DRSUAPI_DRS_PER_SYNC |
+ drsuapi.DRSUAPI_DRS_GET_ANC |
+ drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
+ drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
ctx.join_add_objects()
def setUp(self):
super(SchemaTests_msDS_isRODC, self).setUp()
- self.ldb = SamDB(host, credentials=creds,
+ self.ldb = SamDB(host, credentials=creds,
session_info=system_session(lp), lp=lp, options=ldb_options)
res = self.ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["defaultNamingContext"])
self.assertEquals(len(res), 1)
def test_pack_repl_sample(self):
blob = self.get_file_blob('testdata/replication-ndrpack-example.gz')
- desc = ndr_unpack(drsuapi.DsGetNCChangesCtr6, blob)
+ desc = ndr_unpack(drsuapi.DsGetNCChangesCtr6, blob)
self._test_pack(desc, cycles=20)
if "://" not in host:
ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
ctx.secure_channel_type = misc.SEC_CHAN_RODC
ctx.RODC = True
- ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
- drsuapi.DRSUAPI_DRS_PER_SYNC |
- drsuapi.DRSUAPI_DRS_GET_ANC |
- drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
- drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+ ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
+ drsuapi.DRSUAPI_DRS_PER_SYNC |
+ drsuapi.DRSUAPI_DRS_GET_ANC |
+ drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
+ drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
ctx.join_add_objects()
ldb_dc1.modify(m)
except ldb.LdbError as e1:
(num, msg) = e1.args
- self.fail("Failed to reassign RID Master " + msg)
+ self.fail("Failed to reassign RID Master " + msg)
try:
# 2. Perform a RID alloc
ldb_dc2.modify(m)
except ldb.LdbError as e:
(num, msg) = e.args
- self.fail("Failed to restore RID Master " + msg)
+ self.fail("Failed to restore RID Master " + msg)
def test_offline_samba_tool_seized_ridalloc(self):
"""Perform a join against the non-RID manager and then seize the RID Manager role"""