# TODO set debug
def msg(l, text):
print(text)
- #self.set_debug(msg)
+ # self.set_debug(msg)
self.set_utf8_casefold()
'xterm-256color': {
'alternate rows': (colour.xterm_256_colour(39),
colour.xterm_256_colour(45)),
- #'alternate rows': (colour.xterm_256_colour(246),
+ # 'alternate rows': (colour.xterm_256_colour(246),
# colour.xterm_256_colour(247)),
'disconnected': colour.xterm_256_colour(124, bg=True),
'connected': colour.xterm_256_colour(112),
'xterm-256color-heatmap': {
'alternate rows': (colour.xterm_256_colour(171),
colour.xterm_256_colour(207)),
- #'alternate rows': (colour.xterm_256_colour(246),
+ # 'alternate rows': (colour.xterm_256_colour(246),
# colour.xterm_256_colour(247)),
'disconnected': colour.xterm_256_colour(124, bg=True),
'connected': colour.xterm_256_colour(112, bg=True),
'vertical': '│',
'horizontal': '─',
'corner': '╭',
- #'diagonal': '╲',
+ # 'diagonal': '╲',
'diagonal': '·',
- #'missing': '🕱',
+ # 'missing': '🕱',
'missing': '-',
'right_arrow': '←',
},
# here, but using vertex seems to make more sense. That is,
# the docs want this:
#
- #bh = self.get_bridgehead(local_vertex.site, vertex.part, transport,
+ # bh = self.get_bridgehead(local_vertex.site, vertex.part, transport,
# local_vertex.is_black(), detect_failed)
#
# TODO WHY?????
WARN = logger.warning
-#colours for prettier logs
+# colours for prettier logs
from samba.colour import C_NORMAL, REV_RED
from samba.colour import DARK_RED, RED
from samba.colour import DARK_GREEN, GREEN
info_c.interval = max(info_a.interval, info_b.interval)
info_c.options = info_a.options & info_b.options
- #schedule of None defaults to "always"
+ # schedule of None defaults to "always"
if info_a.schedule is None:
info_a.schedule = [0xFF] * 84
if info_b.schedule is None:
if verify or dot_file_dir is not None:
graph_edges = [[x.site.site_dnstr for x in e.vertices]
for e in edge_list]
- #add the reverse edge if not directed.
+ # add the reverse edge if not directed.
graph_edges.extend([x.site.site_dnstr
for x in reversed(e.vertices)]
for e in edge_list if not e.directed)
# Sorted based on internal comparison function of internal edge
edges.sort()
- #XXX expected_num_tree_edges is never used
+ # XXX expected_num_tree_edges is never used
expected_num_tree_edges = 0 # TODO this value makes little sense
count_edges = 0
# SET v.Color to COLOR.RED
# ELSEIF s contains one or more partial replicas of the NC
# SET v.Color to COLOR.BLACK
- #ELSE
+ # ELSE
# SET v.Color to COLOR.WHITE
# set to minimum (no replica)
i_idx = j_idx
t_time = 0
- #XXX doc says current time < c.timeLastSyncSuccess - f
+ # XXX doc says current time < c.timeLastSyncSuccess - f
# which is true only if f is negative or clocks are wrong.
# f is not negative in the default case (2 hours).
elif self.nt_now - cursor.last_sync_success > f:
dsa_opts |= self.option_map[flag]
else:
dsa_opts &= ~self.option_map[flag]
- #save new options
+ # save new options
m = ldb.Message()
m.dn = ldb.Dn(self.samdb, ntds_dn)
m["options"] = ldb.MessageElement(str(dsa_opts), ldb.FLAG_MOD_REPLACE, "options")
"""Add new ace explicitly."""
desc = self.read_descriptor(samdb, object_dn)
desc_sddl = desc.as_sddl(self.get_domain_sid(samdb))
- #TODO add bindings for descriptor manipulation and get rid of this
+ # TODO add bindings for descriptor manipulation and get rid of this
desc_aces = re.findall("\(.*?\)", desc_sddl)
for ace in desc_aces:
if ("ID" in ace):
m.dn = ldb.Dn(samdb, self.schema_dn)
else:
raise CommandError("Invalid FSMO role.")
- #first try to transfer to avoid problem if the owner is still active
+ # first try to transfer to avoid problem if the owner is still active
seize = False
master_owner = get_fsmo_roleowner(samdb, m.dn, role)
# if there is a different owner
try:
transfer_role(self.outf, role, samdb)
except:
- #transfer failed, use the big axe...
+ # transfer failed, use the big axe...
seize = True
self.message("Transfer unsuccessful, seizing...")
else:
m.dn = ldb.Dn(samdb, self.forestdns_dn)
else:
raise CommandError("Invalid FSMO role.")
- #first try to transfer to avoid problem if the owner is still active
+ # first try to transfer to avoid problem if the owner is still active
seize = False
master_owner = get_fsmo_roleowner(samdb, m.dn, role)
if master_owner is not None:
transfer_dns_role(self.outf, sambaopts, credopts, role,
samdb)
except:
- #transfer failed, use the big axe...
+ # transfer failed, use the big axe...
seize = True
self.message("Transfer unsuccessful, seizing...")
else:
expression="(objectClass=nTDSConnection)",
attrs=['fromServer'],
# XXX can't be critical for ldif test
- #controls=["search_options:1:2"],
+ # controls=["search_options:1:2"],
controls=["search_options:0:2"],
)
if lp is None:
lp = samba.param.LoadParm()
- #Load non-existent file
+ # Load non-existent file
if os.path.exists(smbconf):
lp.load(smbconf)
# easily kill it
self.slapd_provision_command.append("-d0")
- #the command for the final run is the normal script
+ # the command for the final run is the normal script
self.slapd_command = \
[os.path.join(self.ldapdir,
"slapd-" + self.ldap_instance, "start-slapd")]
for rserver in rootservers:
record = [ndr_pack(ARecord(rootservers[rserver], serial=0, ttl=0, rank=dnsp.DNS_RANK_ROOT_HINT))]
# Add AAAA record as well (How does W2K* add IPv6 records?)
- #if rserver in rootservers_v6:
+ # if rserver in rootservers_v6:
# record.append(ndr_pack(AAAARecord(rootservers_v6[rserver], serial=0, ttl=0)))
msg = ldb.Message(ldb.Dn(samdb, "DC=%s,%s" % (rserver, container_dn)))
msg["objectClass"] = ["top", "dnsNode"]
:param autofill: Create DNS records (using fixed template)
"""
- ##### Set up DC=DomainDnsZones,<DOMAINDN>
+ # Set up DC=DomainDnsZones,<DOMAINDN>
# Add rootserver records
if add_root:
add_rootservers(samdb, domaindn, "DC=DomainDnsZones")
dnsdomain, hostname, hostip, hostip6)
if fill_level != FILL_SUBDOMAIN:
- ##### Set up DC=ForestDnsZones,<FORESTDN>
+ # Set up DC=ForestDnsZones,<FORESTDN>
# Add _msdcs record
add_msdcs_record(samdb, forestdn, "DC=ForestDnsZones", dnsforest)
continue
value = getattr(interface, n)
if isinstance(value, str):
- #print "%s=\"%s\"" % (n, value)
+ # print "%s=\"%s\"" % (n, value)
pass
elif isinstance(value, int) or isinstance(value, long):
- #print "%s=%d" % (n, value)
+ # print "%s=%d" % (n, value)
pass
elif isinstance(value, type):
try:
self.assertIsNone(fn(*self.complete_graph))
def test_graph_connected_under_vertex_failures(self):
- #XXX no tests to distinguish this from the edge_failures case
+ # XXX no tests to distinguish this from the edge_failures case
fn = verify_graph_connected_under_vertex_failures
self.assertGraphError(fn, *self.line)
r = subprocess.call([dot, '-Tcanon', ffn])
self.assertEqual(r, 0)
- #even if dot is not there, at least check the file is non-empty
+ # even if dot is not there, at least check the file is non-empty
size = os.stat(ffn).st_size
self.assertNotEqual(size, 0)
files.append(ffn)
backend_obj.wrap_setxattr(dbname,
self.tempf, "system.fake_access_acl", b"")
- #however, as this is direct DB access, we do not notice it
+ # however, as this is direct DB access, we do not notice it
facl = getntacl(self.lp, self.tempf, direct_db_access=True)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(acl, facl.as_sddl(anysid))
backend_obj.wrap_setxattr(dbname,
self.tempf, "system.fake_access_acl", b"")
- #the hash would break, and we return an ACL based only on the mode, except we set the ACL using the 'ntvfs' mode that doesn't include a hash
+ # the hash would break, and we return an ACL based only on the mode, except we set the ACL using the 'ntvfs' mode that doesn't include a hash
facl = getntacl(self.lp, self.tempf)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(acl, facl.as_sddl(anysid))
backend_obj.wrap_setxattr(dbname,
self.tempf, "system.fake_access_acl", b"")
- #the hash will break, and we return an ACL based only on the mode
+ # the hash will break, and we return an ACL based only on the mode
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(simple_acl_from_posix, facl.as_sddl(anysid))
# Checking for existence of record (local || remote)
msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
attrs=['unixName', 'cn', 'dn', 'sambaUnicodePwd'])
- #print "got %d replies" % len(msg)
+ # print "got %d replies" % len(msg)
self.assertEquals(len(msg), 1) # TODO: should check with more records
self.assertEquals(str(msg[0]["cn"]), "Niemand")
self.assertEquals(str(msg[0]["unixName"]), "bin")
# TODO:
# Using the SID directly in the parse tree leads to conversion
# errors, letting the search fail with no results.
- #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-1052)", scope=SCOPE_DEFAULT, attrs)
+ # res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-1052)", scope=SCOPE_DEFAULT, attrs)
res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
self.assertEquals(len(res), 4)
res = sorted(res, key=attrgetter('dn'))
# Note that Xs "objectSid" seems to be fine in the previous search for
# "objectSid"...
- #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
- #print len(res) + " results found"
- #for i in range(len(res)):
+ # res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
+ # print len(res) + " results found"
+ # for i in range(len(res)):
# for (obj in res[i]) {
# print obj + ": " + res[i][obj]
# }
stdout=subprocess.PIPE).communicate()[0]
domsid = domsid.split(' ')[0]
-#print domain
-#print domsid
+# print domain
+# print domsid
sids = [domsid + '-512', 'S-1-5-32-545', domsid + '-513', 'S-1-1-0', 'S-1-3-1', 'S-1-5-1']
plantestsuite("samba3.blackbox.net_usershare", "fileserver:local", [os.path.join(samba3srcdir, "script/tests/test_net_usershare.sh"), '$SERVER', '$SERVER_IP', '$USERNAME', '$PASSWORD', smbclient3])
-#TODO encrypted against member, with member creds, and with DC creds
+# TODO encrypted against member, with member creds, and with DC creds
plantestsuite("samba3.blackbox.net.misc", "nt4_dc:local",
[os.path.join(samba3srcdir, "script/tests/test_net_misc.sh"),
scriptdir, "$SMB_CONF_PATH", net, configuration])
plantestsuite("samba3.async_req", "nt4_dc",
[os.path.join(samba3srcdir, "script/tests/test_async_req.sh")])
-#smbtorture4 tests
+# smbtorture4 tests
base = ["base.attr", "base.charset", "base.chkpath", "base.createx_access", "base.defer_open", "base.delaywrite", "base.delete",
"base.deny1", "base.deny2", "base.deny3", "base.denydos", "base.dir1", "base.dir2",
self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
self.addCleanup(self.delete_admin_connection)
- #used for anonymous login
+ # used for anonymous login
self.creds_tmp = Credentials()
self.creds_tmp.set_username("")
self.creds_tmp.set_password("")
del self.sd_utils
del self.ldb_admin
-#tests on ldap add operations
+# tests on ldap add operations
class AclAddTests(AclTests):
def setUp(self):
else:
self.fail()
-#tests on ldap modify operations
+# tests on ldap modify operations
class AclModifyTests(AclTests):
def setUp(self):
changetype: modify
add: Member
Member: """ + self.get_user_dn(self.user_with_sm)
-#the user has no rights granted, this should fail
+# the user has no rights granted, this should fail
try:
self.ldb_user2.modify_ldif(ldif)
except LdbError as e11:
# This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
self.fail()
-#grant self-membership, should be able to add himself
+# grant self-membership, should be able to add himself
user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
% ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
-#but not other users
+# but not other users
ldif = """
dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
changetype: modify
Member: """ + self.get_user_dn(self.user_with_sm) + """
Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
-#grant self-membership, should be able to add himself but not others at the same time
+# grant self-membership, should be able to add himself but not others at the same time
user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
def test_modify_u7(self):
"""13 User with WP modifying Member"""
-#a second user is given write property permission
+# a second user is given write property permission
user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
mod = "(A;;WP;;;%s)" % str(user_sid)
self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
else:
self.fail()
-#enable these when we have search implemented
+# enable these when we have search implemented
class AclSearchTests(AclTests):
def setUp(self):
anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
self.assertEquals(len(res), 1)
- #verify some of the attributes
- #don't care about values
+ # verify some of the attributes
+ # don't care about values
self.assertTrue("ldapServiceName" in res[0])
self.assertTrue("namingContexts" in res[0])
self.assertTrue("isSynchronized" in res[0])
self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
- #regular users must see only ou1 and ou2
+ # regular users must see only ou1 and ou2
res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 2)
res_list = [x["dn"] for x in res if x["dn"] in ok_list]
self.assertEquals(sorted(res_list), sorted(ok_list))
- #these users should see all ous
+ # these users should see all ous
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 6)
self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
- #this user should see all ous
+ # this user should see all ous
res_list = [x["dn"] for x in res if x["dn"] in self.full_list]
self.assertEquals(sorted(res_list), sorted(self.full_list))
- #these users should see ou1, 2, 5 and 6 but not 3 and 4
+ # these users should see ou1, 2, 5 and 6 but not 3 and 4
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
ok_list = [Dn(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn),
Dn(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
Dn(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
- #should not see ou3 and ou4, but should see ou5 and ou6
+ # should not see ou3 and ou4, but should see ou5 and ou6
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_SUBTREE)
self.assertEquals(len(res), 4)
res_list = list(res[0].keys())
self.assertEquals(res_list, ok_list)
- #give read property on ou and assert user can only see dn and ou
+ # give read property on ou and assert user can only see dn and ou
mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
res_list = list(res[0].keys())
self.assertEquals(sorted(res_list), sorted(ok_list))
- #give read property on Public Information and assert user can see ou and other members
+ # give read property on Public Information and assert user can see ou and other members
mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
scope=SCOPE_SUBTREE)
- #nothing should be returned as ou is not accessible
+ # nothing should be returned as ou is not accessible
self.assertEquals(len(res), 0)
- #give read property on ou and assert user can only see dn and ou
+ # give read property on ou and assert user can only see dn and ou
mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
res_list = list(res[0].keys())
self.assertEquals(sorted(res_list), sorted(ok_list))
- #give read property on Public Information and assert user can see ou and other members
+ # give read property on Public Information and assert user can see ou and other members
mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
self.assert_search_on_attr(str(ou1_dn), self.ldb_admin, attr,
expected_list=self.full_list)
-#tests on ldap delete operations
+
+# tests on ldap delete operations
class AclDeleteTests(AclTests):
def setUp(self):
else:
self.fail()
-#tests on ldap rename operations
+# tests on ldap rename operations
class AclRenameTests(AclTests):
def setUp(self):
self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
else:
self.fail()
- #add an allow ace so we can delete this ou
+ # add an allow ace so we can delete this ou
mod = "(A;;DC;;;DA)"
self.sd_utils.dacl_add_ace(ou1_dn, mod)
-#tests on Control Access Rights
+# tests on Control Access Rights
class AclCARTests(AclTests):
def setUp(self):
def test_change_password1(self):
"""Try a password change operation without any CARs given"""
- #users have change password by default - remove for negative testing
+ # users have change password by default - remove for negative testing
desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
sddl = desc.as_sddl(self.domain_sid)
sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
def test_change_password7(self):
"""Try a password change operation without any CARs given"""
- #users have change password by default - remove for negative testing
+ # users have change password by default - remove for negative testing
desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
sddl = desc.as_sddl(self.domain_sid)
self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
- #first change our own password
+ # first change our own password
self.ldb_user2.modify_ldif("""
dn: """ + self.get_user_dn(self.user_with_pc) + """
changetype: modify
add: unicodePwd
unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')).decode('utf8') + """
""")
- #then someone else's
+ # then someone else's
self.ldb_user2.modify_ldif("""
dn: """ + self.get_user_dn(self.user_with_wp) + """
changetype: modify
def setUp(self):
super(AclExtendedTests, self).setUp()
- #regular user, will be the creator
+ # regular user, will be the creator
self.u1 = "ext_u1"
- #regular user
+ # regular user
self.u2 = "ext_u2"
- #admin user
+ # admin user
self.u3 = "ext_u3"
self.ldb_admin.newuser(self.u1, self.user_pass)
self.ldb_admin.newuser(self.u2, self.user_pass)
del self.ldb_user3
def test_ntSecurityDescriptor(self):
- #create empty ou
+ # create empty ou
self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
- #give u1 Create children access
+ # give u1 Create children access
mod = "(A;;CC;;;%s)" % str(self.user_sid1)
self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
mod = "(A;;LC;;;%s)" % str(self.user_sid2)
self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
- #create a group under that, grant RP to u2
+ # create a group under that, grant RP to u2
self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1",
grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
mod = "(A;;RP;;;%s)" % str(self.user_sid2)
self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
- #u2 must not read the descriptor
+ # u2 must not read the descriptor
res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
SCOPE_BASE, None, ["nTSecurityDescriptor"])
self.assertNotEqual(len(res), 0)
self.assertFalse("nTSecurityDescriptor" in res[0].keys())
- #grant RC to u2 - still no access
+ # grant RC to u2 - still no access
mod = "(A;;RC;;;%s)" % str(self.user_sid2)
self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
SCOPE_BASE, None, ["nTSecurityDescriptor"])
self.assertNotEqual(len(res), 0)
self.assertFalse("nTSecurityDescriptor" in res[0].keys())
- #u3 is member of administrators group, should be able to read sd
+ # u3 is member of administrators group, should be able to read sd
res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
SCOPE_BASE, None, ["nTSecurityDescriptor"])
self.assertEqual(len(res), 1)
self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
(ctx.ntds_guid, ctx.dnsdomain))
- #the following spns do not match the restrictions and should fail
+ # the following spns do not match the restrictions and should fail
try:
self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
(ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
(self.computername, self.dcctx.dnsdomain))
self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
- #user has neither WP nor Validated-SPN, access denied expected
+ # user has neither WP nor Validated-SPN, access denied expected
try:
self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
except LdbError as e45:
mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
self.sd_utils.dacl_add_ace(self.computerdn, mod)
- #grant Validated-SPN and check which values are accepted
- #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
+ # grant Validated-SPN and check which values are accepted
+ # see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
# for regular computer objects we shouldalways get constraint violation
self.user_pass = samba.generate_random_password(12, 16)
self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
- #used for anonymous login
+ # used for anonymous login
print("baseDN: %s" % self.base_dn)
def get_user_dn(self, name):
return ldb_target
-#tests on ldap add operations
+# tests on ldap add operations
class SimpleDirsyncTests(DirsyncBaseTests):
def setUp(self):
except Exception:
pass
- #def test_dirsync_errors(self):
+ # def test_dirsync_errors(self):
def test_dirsync_supported(self):
"""Test the basic of the dirsync is supported"""
delete_force(self.ldb, "cn=ldaptestgroup,cn=users," + self.base_dn)
- #only write is allowed with NC_HEAD for originating updates
+ # only write is allowed with NC_HEAD for originating updates
try:
self.ldb.add({
"dn": "cn=ldaptestuser2,cn=users," + self.base_dn,
################################################################################################
- ## Tests for DOMAIN
+ # Tests for DOMAIN
# Default descriptor tests #####################################################################
def setUp(self):
super(OwnerGroupDescriptorTests, self).setUp()
self.deleteAll()
- ### Create users
+ # Create users
# User 1 - Enterprise Admins
self.ldb_admin.newuser("testuser1", "samba123@")
# User 2 - Domain Admins
desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
res = re.search("(O:.*G:.*?)D:", desc_sddl).group(1)
self.assertEqual(self.results[self.DS_BEHAVIOR][self._testMethodName[5:]] % str(user_sid), res)
- #this fails, research why
+ # this fails, research why
#self.check_modify_inheritance(_ldb, object_dn)
def test_104(self):
self.ldb_admin.create_ou(object_dn)
desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
- ## Tests for SCHEMA
+ # Tests for SCHEMA
# Defalt descriptor tests ##################################################################
self.check_user_belongs(self.get_users_domain_dn(user_name), [])
# Open Ldb connection with the tested user
_ldb = self.get_ldb_connection(user_name, "samba123@")
- #Change Schema partition descriptor
+ # Change Schema partition descriptor
user_sid = self.sd_utils.get_object_sid(self.get_users_domain_dn(user_name))
mod = "(A;CI;WDCC;;;AU)"
self.sd_utils.dacl_add_ace(self.schema_dn, mod)
self.check_user_belongs(self.get_users_domain_dn(user_name), ["Enterprise Admins", "Domain Admins"])
# Open Ldb connection with the tested user
_ldb = self.get_ldb_connection(user_name, "samba123@")
- #Change Schema partition descriptor
+ # Change Schema partition descriptor
mod = "(A;CI;WDCC;;;AU)"
self.sd_utils.dacl_add_ace(self.schema_dn, mod)
# Create example Schema class
res = re.search("(O:.*G:.*?)D:", desc_sddl).group(1)
self.assertEqual("O:DAG:DA", res)
- ## Tests for CONFIGURATION
+ # Tests for CONFIGURATION
# Defalt descriptor tests ##################################################################
def setUp(self):
super(RightsAttributesTests, self).setUp()
self.deleteAll()
- ### Create users
+ # Create users
# User 1
self.ldb_admin.newuser("testuser_attr", "samba123@")
# User 2, Domain Admins
self.ldb_admin.create_ou(object_dn)
print(self.get_users_domain_dn("testuser_attr"))
user_sid = self.sd_utils.get_object_sid(self.get_users_domain_dn("testuser_attr"))
- #give testuser1 read access so attributes can be retrieved
+ # give testuser1 read access so attributes can be retrieved
mod = "(A;CI;RP;;;%s)" % str(user_sid)
self.sd_utils.dacl_add_ace(object_dn, mod)
_ldb = self.get_ldb_connection("testuser_attr", "samba123@")
res = _ldb.search(base=object_dn, expression="", scope=SCOPE_BASE,
attrs=["sDRightsEffective"])
- #user whould have no rights at all
+ # user whould have no rights at all
self.assertEquals(len(res), 1)
self.assertEquals(res[0]["sDRightsEffective"][0], "0")
- #give the user Write DACL and see what happens
+ # give the user Write DACL and see what happens
mod = "(A;CI;WD;;;%s)" % str(user_sid)
self.sd_utils.dacl_add_ace(object_dn, mod)
res = _ldb.search(base=object_dn, expression="", scope=SCOPE_BASE,
attrs=["sDRightsEffective"])
- #user whould have DACL_SECURITY_INFORMATION
+ # user whould have DACL_SECURITY_INFORMATION
self.assertEquals(len(res), 1)
self.assertEquals(res[0]["sDRightsEffective"][0], ("%d") % SECINFO_DACL)
- #give the user Write Owners and see what happens
+ # give the user Write Owners and see what happens
mod = "(A;CI;WO;;;%s)" % str(user_sid)
self.sd_utils.dacl_add_ace(object_dn, mod)
res = _ldb.search(base=object_dn, expression="", scope=SCOPE_BASE,
attrs=["sDRightsEffective"])
- #user whould have DACL_SECURITY_INFORMATION, OWNER_SECURITY_INFORMATION, GROUP_SECURITY_INFORMATION
+ # user whould have DACL_SECURITY_INFORMATION, OWNER_SECURITY_INFORMATION, GROUP_SECURITY_INFORMATION
self.assertEquals(len(res), 1)
self.assertEquals(res[0]["sDRightsEffective"][0], ("%d") % (SECINFO_DACL | SECINFO_GROUP | SECINFO_OWNER))
- #no way to grant security privilege bu adding ACE's so we use a memeber of Domain Admins
+ # no way to grant security privilege bu adding ACE's so we use a memeber of Domain Admins
_ldb = self.get_ldb_connection("testuser_attr2", "samba123@")
res = _ldb.search(base=object_dn, expression="", scope=SCOPE_BASE,
attrs=["sDRightsEffective"])
- #user whould have DACL_SECURITY_INFORMATION, OWNER_SECURITY_INFORMATION, GROUP_SECURITY_INFORMATION
+ # user whould have DACL_SECURITY_INFORMATION, OWNER_SECURITY_INFORMATION, GROUP_SECURITY_INFORMATION
self.assertEquals(len(res), 1)
self.assertEquals(res[0]["sDRightsEffective"][0], \
("%d") % (SECINFO_DACL | SECINFO_GROUP | SECINFO_OWNER | SECINFO_SACL))
delete_force(self.ldb_admin, object_dn)
self.ldb_admin.create_ou(object_dn)
user_sid = self.sd_utils.get_object_sid(self.get_users_domain_dn("testuser_attr"))
- #give testuser1 read access so attributes can be retrieved
+ # give testuser1 read access so attributes can be retrieved
mod = "(A;CI;RP;;;%s)" % str(user_sid)
self.sd_utils.dacl_add_ace(object_dn, mod)
_ldb = self.get_ldb_connection("testuser_attr", "samba123@")
res = _ldb.search(base=object_dn, expression="", scope=SCOPE_BASE,
attrs=["allowedChildClassesEffective"])
- #there should be no allowed child classes
+ # there should be no allowed child classes
self.assertEquals(len(res), 1)
self.assertFalse("allowedChildClassesEffective" in res[0].keys())
- #give the user the right to create children of type user
+ # give the user the right to create children of type user
mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
self.sd_utils.dacl_add_ace(object_dn, mod)
res = _ldb.search(base=object_dn, expression="", scope=SCOPE_BASE,
delete_force(self.ldb_admin, object_dn)
self.ldb_admin.create_ou(object_dn)
user_sid = self.sd_utils.get_object_sid(self.get_users_domain_dn("testuser_attr"))
- #give testuser1 read access so attributes can be retrieved
+ # give testuser1 read access so attributes can be retrieved
mod = "(A;CI;RP;;;%s)" % str(user_sid)
self.sd_utils.dacl_add_ace(object_dn, mod)
_ldb = self.get_ldb_connection("testuser_attr", "samba123@")
res = _ldb.search(base=object_dn, expression="", scope=SCOPE_BASE,
attrs=["allowedAttributesEffective"])
- #there should be no allowed attributes
+ # there should be no allowed attributes
self.assertEquals(len(res), 1)
self.assertFalse("allowedAttributesEffective" in res[0].keys())
- #give the user the right to write displayName and managedBy
+ # give the user the right to write displayName and managedBy
mod2 = "(OA;CI;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
mod = "(OA;CI;WP;0296c120-40da-11d1-a9c0-0000f80367c1;;%s)" % str(user_sid)
# also rights to modify an read only attribute, fromEntry
return "CN=%s,CN=Users,%s" % (name, self.base_dn)
-#tests on sites
+# tests on sites
class SimpleSitesTests(SitesBaseTests):
def test_create_and_delete(self):
except LdbError as e4:
(num, _) = e4.args
self.assertEquals(num, ERR_OPERATIONS_ERROR)
- #try to undelete from config to base dn
+ # try to undelete from config to base dn
try:
self.restore_deleted_object(self.samdb, objDeleted2.dn, c4)
self.fail()
except LdbError as e5:
(num, _) = e5.args
self.assertEquals(num, ERR_OPERATIONS_ERROR)
- #assert undeletion will work in same nc
+ # assert undeletion will work in same nc
self.restore_deleted_object(self.samdb, objDeleted1.dn, c4)
self.restore_deleted_object(self.samdb, objDeleted2.dn, c3)
start = max(before - 1, 1)
end = max(start + 4, original_n - after + 2)
for offset in range(start, end):
- #if iteration > 2076:
+ # if iteration > 2076:
# return
cookie = get_cookie(controls, original_n)
vlv_search = encode_vlv_control(before=before,
store_utdv = ndr_unpack(drsblobs.replUpToDateVectorBlob, store_utdv_blob)
assert store_dn == dn
- #print "%s" % ndr_print(store_hwm)
- #print "%s" % ndr_print(store_utdv)
+ # print "%s" % ndr_print(store_hwm)
+ # print "%s" % ndr_print(store_utdv)
except Exception:
store_dn = dn
store_hwm = drsuapi.DsReplicaHighWaterMark()
obj_item = obj_item.next_object
continue
- #print '%s' % obj.identifier.dn
+ # print '%s' % obj.identifier.dn
is_deleted = False
for i in range(0, obj.attribute_ctr.num_attributes):
spl = ndr_unpack(drsblobs.supplementalCredentialsBlob, attr_val)
- #print '%s' % ndr_print(spl)
+ # print '%s' % ndr_print(spl)
cleartext_hex = None
krb5_old_raw = binascii.a2b_hex(krb5_old_hex)
krb5_old = ndr_unpack(drsblobs.package_PrimaryKerberosBlob, krb5_old_raw, allow_remaining=True)
- #print '%s' % ndr_print(krb5_old)
+ # print '%s' % ndr_print(krb5_old)
krb5_new_hex = None
krb5_new_raw = binascii.a2b_hex(krb5_new_hex)
krb5_new = ndr_unpack(drsblobs.package_PrimaryKerberosBlob, krb5_new_raw, allow_remaining=True)
- #print '%s' % ndr_print(krb5_new)
+ # print '%s' % ndr_print(krb5_new)
obj_item = obj_item.next_object
store_utdv_ctr.cursors = ctr.uptodateness_vector.cursors
store_utdv.ctr = store_utdv_ctr
- #print "%s" % ndr_print(store_hwm)
- #print "%s" % ndr_print(store_utdv)
+ # print "%s" % ndr_print(store_hwm)
+ # print "%s" % ndr_print(store_utdv)
store_hwm_blob = ndr_pack(store_hwm)
store_utdv_blob = ndr_pack(store_utdv)
plansmbtorture4testsuite(t, env, ["%s:$SERVER[%s]" % (transport, bindoptions), '-U$USERNAME%$PASSWORD', '--workgroup=$DOMAIN'], "samba4.%s on %s with %s" % (t, transport, bindoptions))
plansmbtorture4testsuite('rpc.samba3-sharesec', env, ["%s:$SERVER[%s]" % (transport, bindoptions), '-U$USERNAME%$PASSWORD', '--workgroup=$DOMAIN', '--option=torture:share=tmp'], "samba4.rpc.samba3.sharesec on %s with %s" % (transport, bindoptions))
-#Plugin S4 DC tests (confirms named pipe auth forwarding). This can be expanded once kerberos is supported in the plugin DC
+# Plugin S4 DC tests (confirms named pipe auth forwarding). This can be expanded once kerberos is supported in the plugin DC
#
for bindoptions in ["seal,padcheck"] + validate_list + ["bigendian"]:
for t in ncacn_np_tests:
transports = ["ncacn_np", "ncacn_ip_tcp"]
-#Kerberos varies between functional levels, so it is important to check this on all of them
+# Kerberos varies between functional levels, so it is important to check this on all of them
for env in ["ad_dc_ntvfs", "fl2000dc", "fl2003dc", "fl2008r2dc", "ad_dc"]:
transport = "ncacn_np"
plansmbtorture4testsuite('rpc.pac', env, ["%s:$SERVER[]" % (transport, ), '-U$USERNAME%$PASSWORD', '--workgroup=$DOMAIN'], "samba4.rpc.pac on %s" % (transport,))
smb2_s3only = ["smb2.change_notify_disabled", "smb2.dosmode", "smb2.credits", "smb2.kernel-oplocks"]
smb2 = [x for x in smbtorture4_testsuites("smb2.") if x not in smb2_s3only]
-#The QFILEINFO-IPC test needs to be on ipc$
+# The QFILEINFO-IPC test needs to be on ipc$
raw = filter(lambda x: "raw.qfileinfo.ipc" not in x, smbtorture4_testsuites("raw."))
base = smbtorture4_testsuites("base.")
# Local tests
for t in smbtorture4_testsuites("local."):
- #The local.resolve test needs a name to look up using real system (not emulated) name routines
+ # The local.resolve test needs a name to look up using real system (not emulated) name routines
plansmbtorture4testsuite(t, "none", "ncalrpc:localhost")
# Confirm these tests with the system iconv too
# Local tests
for t in smbtorture4_testsuites("dlz_bind9."):
- #The dlz_bind9 tests needs to look at the DNS database
+ # The dlz_bind9 tests needs to look at the DNS database
plansmbtorture4testsuite(t, "chgdcpass:local", ["ncalrpc:$SERVER", '-U$USERNAME%$PASSWORD'])
planpythontestsuite("nt4_dc", "samba.tests.libsmb_samba_internal", py3_compatible=True);
drsuapi.DRSUAPI_DS_NAME_FORMAT_CANONICAL_EX,
drsuapi.DRSUAPI_DS_NAME_FORMAT_SERVICE_PRINCIPAL,
# We currently don't support this
- #drsuapi.DRSUAPI_DS_NAME_FORMAT_SID_OR_SID_HISTORY,
+ # drsuapi.DRSUAPI_DS_NAME_FORMAT_SID_OR_SID_HISTORY,
# This format is not supported by Windows (or us)
- #drsuapi.DRSUAPI_DS_NAME_FORMAT_DNS_DOMAIN,
+ # drsuapi.DRSUAPI_DS_NAME_FORMAT_DNS_DOMAIN,
}
def tearDown(self):
expected_links=[ou2_managedBy_dc3, dc3_managedBy_ou1])
# Can fail against Windows due to equal precedence of dc3, cn3
- #self._check_replication([self.ou,ou1,ou2,dc3,cn3],
+ # self._check_replication([self.ou,ou1,ou2,dc3,cn3],
# drsuapi.DRSUAPI_DRS_WRIT_REP|
# drsuapi.DRSUAPI_DRS_GET_ANC,
# expected_links=[ou2_managedBy_dc3,dc3_managedBy_ou1])
expected_links=[ou2_managedBy_dc3, dc3_managedBy_ou1, dc3_managedBy_ou2])
# Can fail against Windows due to equal precedence of dc3, cn3
- #self._check_replication([self.ou,ou1,ou2,dc3,cn3],
+ # self._check_replication([self.ou,ou1,ou2,dc3,cn3],
# drsuapi.DRSUAPI_DRS_WRIT_REP|
# drsuapi.DRSUAPI_DRS_GET_ANC,
# expected_links=[ou2_managedBy_dc3,dc3_managedBy_ou1,dc3_managedBy_ou2])
child.close()
i = child.exitstatus
if wait_for_fail:
- #wait for timeout or fail
+ # wait for timeout or fail
if i == None or i > 0:
return
else: