# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
"""Tests the possibleInferiors generation in the schema_fsmo ldb module"""
import optparse
def test_class(db, classinfo, oc):
"""test to see if one objectclass returns the correct possibleInferiors"""
- print "test: objectClass.%s" % oc
+ print("test: objectClass.%s" % oc)
poss1 = possible_inferiors_search(db, oc)
poss2 = possible_inferiors_constructed(db, classinfo, oc)
if poss1 != poss2:
- print "failure: objectClass.%s [" % oc
- print "Returned incorrect list for objectclass %s" % oc
- print "search: %s" % poss1
- print "constructed: %s" % poss2
+ print("failure: objectClass.%s [" % oc)
+ print("Returned incorrect list for objectclass %s" % oc)
+ print("search: %s" % poss1)
+ print("constructed: %s" % poss2)
for i in range(0,min(len(poss1),len(poss2))):
- print "%30s %30s" % (poss1[i], poss2[i])
- print "]"
+ print("%30s %30s" % (poss1[i], poss2[i]))
+ print("]")
sys.exit(1)
else:
- print "success: objectClass.%s" % oc
+ print("success: objectClass.%s" % oc)
def get_object_classes(db):
"""return a list of all object classes"""
else:
test_class(db,classinfo,objectclass)
-print "Lists match OK"
+print("Lists match OK")
# -*- coding: utf-8 -*-
# This is unit with tests for LDAP access checks
+from __future__ import print_function
import optparse
import sys
import base64
self.creds_tmp.set_domain(creds.get_domain())
self.creds_tmp.set_realm(creds.get_realm())
self.creds_tmp.set_workstation(creds.get_workstation())
- print "baseDN: %s" % self.base_dn
+ print("baseDN: %s" % self.base_dn)
def get_user_dn(self, name):
return "CN=%s,CN=Users,%s" % (name, self.base_dn)
"""5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
# First test object -- User
- print "Testing modify on User object"
+ print("Testing modify on User object")
self.ldb_admin.newuser("test_modify_user1", self.user_pass)
self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
ldif = """
expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
self.assertEqual(res[0]["displayName"][0], "test_changed")
# Second test object -- Group
- print "Testing modify on Group object"
+ print("Testing modify on Group object")
self.ldb_admin.newgroup("test_modify_group1",
grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
self.assertEqual(res[0]["displayName"][0], "test_changed")
# Third test object -- Organizational Unit
- print "Testing modify on OU object"
+ print("Testing modify on OU object")
#delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
"""6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
# First test object -- User
- print "Testing modify on User object"
+ print("Testing modify on User object")
#delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
self.ldb_admin.newuser("test_modify_user1", self.user_pass)
self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
# This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
self.fail()
# Second test object -- Group
- print "Testing modify on Group object"
+ print("Testing modify on Group object")
self.ldb_admin.newgroup("test_modify_group1",
grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
# This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
self.fail()
# Second test object -- Organizational Unit
- print "Testing modify on OU object"
+ print("Testing modify on OU object")
self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
ldif = """
def test_modify_u3(self):
"""7 Modify one attribute as you have no what so ever rights granted"""
# First test object -- User
- print "Testing modify on User object"
+ print("Testing modify on User object")
self.ldb_admin.newuser("test_modify_user1", self.user_pass)
# Modify on attribute you do not have rights for granted
ldif = """
self.fail()
# Second test object -- Group
- print "Testing modify on Group object"
+ print("Testing modify on Group object")
self.ldb_admin.newgroup("test_modify_group1",
grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
# Modify on attribute you do not have rights for granted
self.fail()
# Second test object -- Organizational Unit
- print "Testing modify on OU object"
+ print("Testing modify on OU object")
#delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
# Modify on attribute you do not have rights for granted
self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
- print "Testing correct behavior on nonaccessible search base"
+ print("Testing correct behavior on nonaccessible search base")
try:
self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
scope=SCOPE_BASE)
del self.ldb_user1
def replace_spn(self, _ldb, dn, spn):
- print "Setting spn %s on %s" % (spn, dn)
+ print("Setting spn %s on %s" % (spn, dn))
res = self.ldb_admin.search(dn, expression="(objectClass=*)",
scope=SCOPE_BASE, attrs=["servicePrincipalName"])
if "servicePrincipalName" in res[0].keys():
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+from __future__ import print_function
+
import optparse
import sys
sys.path.insert(0, 'bin/python')
expression=expression,
scope=SCOPE_SUBTREE,
attrs=['cn'])
- print >> sys.stderr, '%d %s took %s' % (i, expression,
- time.time() - t)
+ print('%d %s took %s' % (i, expression,
+ time.time() - t),
+ file=sys.stderr)
def _test_indexed_search(self):
expressions = ['(objectclass=group)',
expression=expression,
scope=SCOPE_SUBTREE,
attrs=['cn'])
- print >> sys.stderr, '%d runs %s took %s' % (i, expression,
- time.time() - t)
+ print('%d runs %s took %s' % (i, expression,
+ time.time() - t),
+ file=sys.stderr)
def _test_base_search(self):
for dn in [self.base_dn, self.ou, self.ou_users,
expression=expression,
scope=SCOPE_SUBTREE,
attrs=['cn'])
- print >> sys.stderr, '%d runs %s took %s' % (i, expression,
- time.time() - t)
+ print('%d runs %s took %s' % (i, expression,
+ time.time() - t),
+ file=sys.stderr)
def _test_complex_search(self, n=100):
classes = ['samaccountname', 'objectCategory', 'dn', 'member']
try:
self.ldb.modify(m)
except LdbError as e:
- print e
- print m
+ print(e)
+ print(m)
def _test_remove_some_links(self, n=(LINK_BATCH_SIZE // 2)):
victims = random.sample(list(self.state.active_links), n)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+from __future__ import print_function
+
import optparse
import sys
sys.path.insert(0, 'bin/python')
expression=expression,
scope=SCOPE_SUBTREE,
attrs=['cn'])
- print >> sys.stderr, '%d %s took %s' % (i, expression,
- time.time() - t)
+ print('%d %s took %s' % (i, expression,
+ time.time() - t), file=sys.stderr)
def _test_indexed_search(self):
expressions = ['(objectclass=group)',
expression=expression,
scope=SCOPE_SUBTREE,
attrs=['cn'])
- print >> sys.stderr, '%d runs %s took %s' % (i, expression,
- time.time() - t)
+ print('%d runs %s took %s' % (i, expression,
+ time.time() - t), file=sys.stderr)
def _test_add_many_users(self, n=BATCH_SIZE):
s = self.state.next_user_id
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+from __future__ import print_function
+
import optparse
import sys
sys.path.insert(0, 'bin/python')
expression=expression,
scope=SCOPE_SUBTREE,
attrs=['cn'])
- print >> sys.stderr, '%d %s took %s' % (i, expression,
- time.time() - t)
+ print('%d %s took %s' % (i, expression,
+ time.time() - t),
+ file=sys.stderr)
def _test_indexed_search(self):
expressions = ['(objectclass=group)',
expression=expression,
scope=SCOPE_SUBTREE,
attrs=['cn'])
- print >> sys.stderr, '%d runs %s took %s' % (i, expression,
- time.time() - t)
+ print('%d runs %s took %s' % (i, expression,
+ time.time() - t),
+ file=sys.stderr)
def _test_complex_search(self):
classes = ['samaccountname', 'objectCategory', 'dn', 'member']
'(', n2, c2, o2, v2,
'))' if n2 else ')',
')'])
- print expression
+ print(expression)
self.ldb.search(self.ou,
expression=expression,
scope=SCOPE_SUBTREE,
expression=expression,
scope=SCOPE_SUBTREE,
attrs=['cn'])
- print >> sys.stderr, '%d runs %s took %s' % (i, expression,
- time.time() - t)
+ print('%d runs %s took %s' % (i, expression,
+ time.time() - t),
+ file=sys.stderr)
def _test_add_many_users(self, n=BATCH_SIZE):
s = self.state.next_user_id
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+from __future__ import print_function
import optparse
import sys
import os
self.configuration_dn = self.ldb.get_config_basedn().get_linearized()
def search_guid(self, guid):
- print "SEARCH by GUID %s" % self.GUID_string(guid)
+ print("SEARCH by GUID %s" % self.GUID_string(guid))
res = self.ldb.search(base="<GUID=%s>" % self.GUID_string(guid),
scope=SCOPE_BASE, controls=["show_deleted:1"])
return res[0]
def search_dn(self,dn):
- print "SEARCH by DN %s" % dn
+ print("SEARCH by DN %s" % dn)
res = self.ldb.search(expression="(objectClass=*)",
base=dn,
super(BasicDeleteTests, self).setUp()
def del_attr_values(self, delObj):
- print "Checking attributes for %s" % delObj["dn"]
+ print("Checking attributes for %s" % delObj["dn"])
self.assertEquals(delObj["isDeleted"][0],"TRUE")
self.assertTrue(not("objectCategory" in delObj))
self.assertTrue(not("sAMAccountType" in delObj))
def preserved_attributes_list(self, liveObj, delObj):
- print "Checking for preserved attributes list"
+ print("Checking for preserved attributes list")
preserved_list = ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
"flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
self.assertTrue(a in delObj)
def check_rdn(self, liveObj, delObj, rdnName):
- print "Checking for correct rDN"
+ print("Checking for correct rDN")
rdn=liveObj[rdnName][0]
rdn2=delObj[rdnName][0]
name2=delObj["name"][0]
self.assertEquals(name2, dn_rdn)
def delete_deleted(self, ldb, dn):
- print "Testing the deletion of the already deleted dn %s" % dn
+ print("Testing the deletion of the already deleted dn %s" % dn)
try:
ldb.delete(dn)
def test_delete_protection(self):
"""Delete protection tests"""
- print self.base_dn
+ print(self.base_dn)
delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
def test_all(self):
"""Basic delete tests"""
- print self.base_dn
+ print(self.base_dn)
# user current time in ms to make unique objects
import time
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import print_function
import optparse
import sys
sys.path.insert(0, "bin/python")
self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
#used for anonymous login
- print "baseDN: %s" % self.base_dn
+ print("baseDN: %s" % self.base_dn)
def get_user_dn(self, name):
return "CN=%s,CN=Users,%s" % (name, self.base_dn)
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
except LdbError as l:
- print l
+ print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
except LdbError as l:
- print l
+ print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
expression="samaccountname=*",
controls=["dirsync:1:1:1"])
except LdbError as l:
- print l
+ print(l)
self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
try:
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
except LdbError as l:
- print l
+ print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
except LdbError as l:
- print l
+ print(l)
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
try:
expression="samaccountname=*",
controls=["dirsync:1:1:1"])
except LdbError as l:
- print l
+ print(l)
self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
def test_dirsync_attributes(self):
# Check that reasking the same question but with an updated cookie
# didn't return any results.
- print control1
+ print(control1)
res = self.ldb_admin.search(self.base_dn,
expression="(name=testgroup)",
controls=[control1])
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import print_function
import optparse
import sys
import time
res = ldb.search(self.base_dn, expression="(&(cn=ldaptestgroup2)(objectClass=group))", scope=SCOPE_SUBTREE, attrs=attrs, controls=["extended_dn:1:1"])
self.assertEquals(len(res), 1, "Could not find (&(cn=ldaptestgroup2)(objectClass=group))")
- print res[0]["member"]
+ print(res[0]["member"])
memberUP = []
for m in res[0]["member"]:
memberUP.append(m.upper())
- print ("<GUID=" + ldb.schema_format_value("objectGUID", ldaptestuser2_guid) + ">;<SID=" + ldb.schema_format_value("objectSid", ldaptestuser2_sid) + ">;CN=ldaptestuser2,CN=Users," + self.base_dn).upper()
+ print(("<GUID=" + ldb.schema_format_value("objectGUID", ldaptestuser2_guid) + ">;<SID=" + ldb.schema_format_value("objectSid", ldaptestuser2_sid) + ">;CN=ldaptestuser2,CN=Users," + self.base_dn).upper())
self.assertTrue(("<GUID=" + ldb.schema_format_value("objectGUID", ldaptestuser2_guid) + ">;<SID=" + ldb.schema_format_value("objectSid", ldaptestuser2_sid) + ">;CN=ldaptestuser2,CN=Users," + self.base_dn).upper() in memberUP)
+from __future__ import print_function
import optparse
import sys
import time
#self.assertTrue("msDS-IntId" in ldb_msg, "msDS-IntId expected on: %s" % ldb_msg.dn)
if "msDS-IntId" not in ldb_msg:
count = count + 1
- print "%3d warning: msDS-IntId expected on: %-30s %s" % (count, ldb_msg["attributeID"], ldb_msg["cn"])
+ print("%3d warning: msDS-IntId expected on: %-30s %s" % (count, ldb_msg["attributeID"], ldb_msg["cn"]))
else:
self.assertTrue("msDS-IntId" not in ldb_msg)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Originally based on ./sam.py
+from __future__ import print_function
import optparse
import sys
import os
try:
self.samdb.delete(self.ou, ['tree_delete:1'])
except ldb.LdbError as e:
- print "tried deleting %s, got error %s" % (self.ou, e)
+ print("tried deleting %s, got error %s" % (self.ou, e))
self.samdb.add({'objectclass': 'organizationalUnit',
'dn': self.ou})
results = sorted(results)
if expected != results:
- print msg
- print "expected %s" % expected
- print "received %s" % results
+ print(msg)
+ print("expected %s" % expected)
+ print("received %s" % results)
self.assertEqual(results, expected)
def test_la_backlinks_reveal(self):
if opts.no_reveal_internals:
- print 'skipping because --no-reveal-internals'
+ print('skipping because --no-reveal-internals')
return
self._test_la_backlinks(True)
def test_la_backlinks_delete_group_reveal(self):
if opts.no_reveal_internals:
- print 'skipping because --no-reveal-internals'
+ print('skipping because --no-reveal-internals')
return
self._test_la_backlinks_delete_group(True)
def test_la_links_delete_link_reveal(self):
if opts.no_reveal_internals:
- print 'skipping because --no-reveal-internals'
+ print('skipping because --no-reveal-internals')
return
self._test_la_links_delete_link_reveal()
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import print_function
import optparse
import sys
import os
except LdbError as e9:
(num, _) = e9.args
if num != ERR_UNWILLING_TO_PERFORM:
- print "va[%s]" % va
+ print("va[%s]" % va)
self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
try:
except LdbError as e11:
(num, _) = e11.args
if num != ERR_UNWILLING_TO_PERFORM:
- print "va[%s]" % va
+ print("va[%s]" % va)
self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
if not "://" in url:
# Copyright Stefan Metzmacher 2014
#
+from __future__ import print_function
import optparse
import sys
import base64
if use_kerberos == MUST_USE_KERBEROS:
logoncount_relation = 'greater'
lastlogon_relation = 'greater'
- print "Performs a password cleartext change operation on 'userPassword' using Kerberos"
+ print("Performs a password cleartext change operation on 'userPassword' using Kerberos")
else:
logoncount_relation = 'equal'
lastlogon_relation = 'equal'
- print "Performs a password cleartext change operation on 'userPassword' using NTLMSSP"
+ print("Performs a password cleartext change operation on 'userPassword' using NTLMSSP")
if initial_lastlogon_relation is not None:
lastlogon_relation = initial_lastlogon_relation
msDSUserAccountControlComputed=0)
badPasswordTime = int(res[0]["badPasswordTime"][0])
- print "two failed password change"
+ print("two failed password change")
# Wrong old password
try:
def _test_unicodePwd_lockout_with_clear_change(self, creds, other_ldb,
initial_logoncount_relation=None):
- print "Performs a password cleartext change operation on 'unicodePwd'"
+ print("Performs a password cleartext change operation on 'unicodePwd'")
username = creds.get_username()
userpass = creds.get_password()
userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0)
- print "two failed password change"
+ print("two failed password change")
# Wrong old password
try:
+from __future__ import print_function
import samba
from samba.auth import system_session
(name, res[0][name], res[0].dn))
- print "%s = '%s'" % (name, res[0][name][0])
+ print("%s = '%s'" % (name, res[0][name][0]))
if mode == "present":
return
effective_bad_password_count=None,
msg=None,
badPwdCountOnly=False):
- print '-=' * 36
+ print('-=' * 36)
if msg is not None:
- print "\033[01;32m %s \033[00m\n" % msg
+ print("\033[01;32m %s \033[00m\n" % msg)
attrs = [
"objectSid",
"badPwdCount",
if use_kerberos == MUST_USE_KERBEROS:
logoncount_relation = 'greater'
lastlogon_relation = 'greater'
- print "Performs a lockout attempt against LDAP using Kerberos"
+ print("Performs a lockout attempt against LDAP using Kerberos")
else:
logoncount_relation = 'equal'
lastlogon_relation = 'equal'
- print "Performs a lockout attempt against LDAP using NTLM"
+ print("Performs a lockout attempt against LDAP using NTLM")
# Change password on a connection as another user
res = self._check_account(userdn,
lastLogon = int(res[0]["lastLogon"][0])
firstLogon = lastLogon
lastLogonTimestamp = int(res[0]["lastLogonTimestamp"][0])
- print firstLogon
- print lastLogonTimestamp
+ print(firstLogon)
+ print(lastLogonTimestamp)
self.assertGreater(lastLogon, badPasswordTime)
msDSUserAccountControlComputed=0)
badPasswordTime = int(res[0]["badPasswordTime"][0])
- print "two failed password change"
+ print("two failed password change")
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
# wait for the lockout to end
time.sleep(self.account_lockout_duration + 1)
- print self.account_lockout_duration + 1
+ print(self.account_lockout_duration + 1)
res = self._check_account(userdn,
badPwdCount=3, effective_bad_password_count=0,
use_kerberos = creds.get_kerberos_state()
if use_kerberos == MUST_USE_KERBEROS:
- print "Testing multiple logon with Kerberos"
+ print("Testing multiple logon with Kerberos")
logoncount_relation = 'greater'
lastlogon_relation = 'greater'
else:
- print "Testing multiple logon with NTLM"
+ print("Testing multiple logon with NTLM")
logoncount_relation = 'equal'
lastlogon_relation = 'equal'
lastLogon = int(res[0]["lastLogon"][0])
lastLogonTimestamp = int(res[0]["lastLogonTimestamp"][0])
firstLogon = lastLogon
- print "last logon is %d" % lastLogon
+ print("last logon is %d" % lastLogon)
self.assertGreater(lastLogon, badPasswordTime)
self.assertGreaterEqual(lastLogon, lastLogonTimestamp)
# secured enough (SASL with a minimum of 128 Bit encryption) - consider
# MS-ADTS 3.1.1.3.1.5
+from __future__ import print_function
import optparse
import sys
import base64
"userPassword": ["thatsAcomplPASS1", "thatsAcomplPASS1"] })
def test_empty_passwords(self):
- print "Performs some empty passwords testing"
+ print("Performs some empty passwords testing")
try:
self.ldb.add({
num == ERR_NO_SUCH_ATTRIBUTE) # for Windows
def test_plain_userPassword(self):
- print "Performs testing about the standard 'userPassword' behaviour"
+ print("Performs testing about the standard 'userPassword' behaviour")
# Delete the "dSHeuristics"
self.ldb.set_dsheuristics(None)
self.ldb.set_dsheuristics("000000001")
def test_modify_dsheuristics_userPassword(self):
- print "Performs testing about reading userPassword between dsHeuristic modifies"
+ print("Performs testing about reading userPassword between dsHeuristic modifies")
# Make sure userPassword cannot be read
self.ldb.set_dsheuristics("000000000")
#!/usr/bin/python
# -*- coding: utf-8 -*-
+from __future__ import print_function
import optparse
import sys
import os
except ldb.LdbError as e:
(ecode, emsg) = e.args
if ecode != ldb.ERR_REFERRAL:
- print emsg
+ print(emsg)
self.fail("Adding %s: ldb error: %s %s, wanted referral" %
(o['dn'], ecode, emsg))
else:
except ldb.LdbError as e4:
(ecode, emsg) = e4.args
if ecode != ldb.ERR_REFERRAL:
- print ecode, emsg
+ print(ecode, emsg)
self.fail("Failed to REFER when trying to delete %s" % dn)
else:
m = re.search(r'(ldap://[^>]+)>', emsg)
except ldb.LdbError as e5:
(ecode, emsg) = e5.args
if ecode != ldb.ERR_NO_SUCH_OBJECT:
- print ecode, emsg
+ print(ecode, emsg)
self.fail("Failed to NO_SUCH_OBJECT when trying to delete "
"%s (which does not exist)" % dn)
#!/usr/bin/python
# -*- coding: utf-8 -*-
+from __future__ import print_function
"""Test communication of credentials etc, between an RODC and a RWDC.
How does it work when the password is changed on the RWDC?
kerberos_state = CREDS.get_kerberos_state()
c.set_kerberos_state(kerberos_state)
- print '-' * 73
+ print('-' * 73)
if kerberos_state == MUST_USE_KERBEROS:
- print "we seem to be using kerberos for %s %s" % (username, password)
+ print("we seem to be using kerberos for %s %s" % (username, password))
elif kerberos_state == DONT_USE_KERBEROS:
- print "NOT using kerberos for %s %s" % (username, password)
+ print("NOT using kerberos for %s %s" % (username, password))
else:
- print "kerberos state is %s" % kerberos_state
+ print("kerberos state is %s" % kerberos_state)
c.set_gensec_features(c.get_gensec_features() |
gensec.FEATURE_SEAL)
credstring,
'--server', RWDC,]
- print ' '.join(cmd)
+ print(' '.join(cmd))
subprocess.check_call(cmd)
set_auto_replication(RWDC, False)
stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode:
- print "failed with code %s" % p.returncode
- print ' '.join(cmd)
- print "stdout"
- print stdout
- print "stderr"
- print stderr
+ print("failed with code %s" % p.returncode)
+ print(' '.join(cmd))
+ print("stdout")
+ print(stdout)
+ print("stderr")
+ print(stderr)
raise RodcRwdcTestException()
def _change_password(self, user_dn, old_password, new_password):
msDSUserAccountControlComputed=0)
badPasswordTime = int(res[0]["badPasswordTime"][0])
- print "two failed password change"
+ print("two failed password change")
# The wrong password
creds_lockout.set_password("thatsAcomplPASS1x")
# wait for the lockout to end
time.sleep(self.account_lockout_duration + 1)
- print self.account_lockout_duration + 1
+ print(self.account_lockout_duration + 1)
res = self._check_account(userdn,
badPwdCount=3, effective_bad_password_count=0,
stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode:
- print "failed with code %s" % p.returncode
- print ' '.join(cmd)
- print "stdout"
- print stdout
- print "stderr"
- print stderr
+ print("failed with code %s" % p.returncode)
+ print(' '.join(cmd))
+ print("stdout")
+ print(stdout)
+ print("stderr")
+ print(stderr)
raise RodcRwdcTestException()
def _check_account_initial(self, dn):
# -*- coding: utf-8 -*-
# This is a port of the original in testprogs/ejs/ldap.js
+from __future__ import print_function
import optparse
import sys
import os
self.ldb = ldb
self.base_dn = ldb.domain_dn()
- print "baseDN: %s\n" % self.base_dn
+ print("baseDN: %s\n" % self.base_dn)
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
delete_force(self.ldb, "cn=ldaptestuser2,cn=users," + self.base_dn)
def test_users_groups(self):
"""This tests the SAM users and groups behaviour"""
- print "Testing users and groups behaviour\n"
+ print("Testing users and groups behaviour\n")
ldb.add({
"dn": "cn=ldaptestgroup,cn=users," + self.base_dn,
def test_sam_attributes(self):
"""Test the behaviour of special attributes of SAM objects"""
- print "Testing the behaviour of special attributes of SAM objects\n"
+ print("Testing the behaviour of special attributes of SAM objects\n")
ldb.add({
"dn": "cn=ldaptestuser,cn=users," + self.base_dn,
def test_primary_group_token_constructed(self):
"""Test the primary group token behaviour (hidden-generated-readonly attribute on groups) and some other constructed attributes"""
- print "Testing primary group token behaviour and other constructed attributes\n"
+ print("Testing primary group token behaviour and other constructed attributes\n")
try:
ldb.add({
def test_tokenGroups(self):
"""Test the tokenGroups behaviour (hidden-generated-readonly attribute on SAM objects)"""
- print "Testing tokenGroups behaviour\n"
+ print("Testing tokenGroups behaviour\n")
# The domain object shouldn't contain any "tokenGroups" entry
res = ldb.search(self.base_dn, scope=SCOPE_BASE, attrs=["tokenGroups"])
def test_groupType(self):
"""Test the groupType behaviour"""
- print "Testing groupType behaviour\n"
+ print("Testing groupType behaviour\n")
# You can never create or change to a
# "GTYPE_SECURITY_BUILTIN_LOCAL_GROUP"
def test_pwdLastSet(self):
"""Test the pwdLastSet behaviour"""
- print "Testing pwdLastSet behaviour\n"
+ print("Testing pwdLastSet behaviour\n")
ldb.add({
"dn": "cn=ldaptestuser,cn=users," + self.base_dn,
def test_ldap_bind_must_change_pwd(self):
"""Test the error messages for failing LDAP binds"""
- print "Test the error messages for failing LDAP binds\n"
+ print("Test the error messages for failing LDAP binds\n")
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
def test_userAccountControl(self):
"""Test the userAccountControl behaviour"""
- print "Testing userAccountControl behaviour\n"
+ print("Testing userAccountControl behaviour\n")
# With a user object
def test_smartcard_required1(self):
"""Test the UF_SMARTCARD_REQUIRED behaviour"""
- print "Testing UF_SMARTCARD_REQUIRED behaviour\n"
+ print("Testing UF_SMARTCARD_REQUIRED behaviour\n")
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
def test_smartcard_required2(self):
"""Test the UF_SMARTCARD_REQUIRED behaviour"""
- print "Testing UF_SMARTCARD_REQUIRED behaviour\n"
+ print("Testing UF_SMARTCARD_REQUIRED behaviour\n")
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
def test_smartcard_required3(self):
"""Test the UF_SMARTCARD_REQUIRED behaviour"""
- print "Testing UF_SMARTCARD_REQUIRED behaviour\n"
+ print("Testing UF_SMARTCARD_REQUIRED behaviour\n")
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
def test_isCriticalSystemObject(self):
"""Test the isCriticalSystemObject behaviour"""
- print "Testing isCriticalSystemObject behaviour\n"
+ print("Testing isCriticalSystemObject behaviour\n")
# Add tests
def test_service_principal_name_updates(self):
"""Test the servicePrincipalNames update behaviour"""
- print "Testing servicePrincipalNames update behaviour\n"
+ print("Testing servicePrincipalNames update behaviour\n")
ldb.add({
"dn": "cn=ldaptestcomputer,cn=computers," + self.base_dn,
def test_sam_description_attribute(self):
"""Test SAM description attribute"""
- print "Test SAM description attribute"
+ print("Test SAM description attribute")
self.ldb.add({
"dn": "cn=ldaptestgroup,cn=users," + self.base_dn,
def test_fSMORoleOwner_attribute(self):
"""Test fSMORoleOwner attribute"""
- print "Test fSMORoleOwner attribute"
+ print("Test fSMORoleOwner attribute")
ds_service_name = self.ldb.get_dsServiceName()
def test_new_user_default_attributes(self):
"""Test default attributes for new user objects"""
- print "Test default attributes for new User objects\n"
+ print("Test default attributes for new User objects\n")
user_name = "ldaptestuser"
user_dn = "CN=%s,CN=Users,%s" % (user_name, self.base_dn)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+from __future__ import print_function
import optparse
import sys
import os
self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
self.addCleanup(self.delete_admin_connection)
- print "baseDN: %s" % self.base_dn
+ print("baseDN: %s" % self.base_dn)
def delete_admin_connection(self):
del self.sd_utils
self.assertTrue(ace in desc_sddl)
# Make sure we have identical result for both "add" and "modify"
res = re.search("(O:.*G:.*?)D:", desc_sddl).group(1)
- print self._testMethodName
+ print(self._testMethodName)
test_number = self._testMethodName[5:]
self.assertEqual(self.results[self.DS_BEHAVIOR][test_number], res)
object_dn = "OU=test_domain_ou1," + self.base_dn
delete_force(self.ldb_admin, object_dn)
self.ldb_admin.create_ou(object_dn)
- print self.get_users_domain_dn("testuser_attr")
+ print(self.get_users_domain_dn("testuser_attr"))
user_sid = self.sd_utils.get_object_sid(self.get_users_domain_dn("testuser_attr"))
#give testuser1 read access so attributes can be retrieved
mod = "(A;CI;RP;;;%s)" % str(user_sid)
self.assertFalse(sub_sddl2 == sub_sddl0)
if ace not in ou_sddl2:
- print "ou0: %s" % ou_sddl0
- print "ou2: %s" % ou_sddl2
+ print("ou0: %s" % ou_sddl0)
+ print("ou2: %s" % ou_sddl2)
if sub_ace not in sub_sddl2:
- print "sub0: %s" % sub_sddl0
- print "sub2: %s" % sub_sddl2
+ print("sub0: %s" % sub_sddl0)
+ print("sub2: %s" % sub_sddl2)
self.assertTrue(ace in ou_sddl2)
self.assertTrue(sub_ace in sub_sddl2)
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
+from __future__ import print_function
import optparse
import sys
try:
subnets.create_subnet(self.ldb, basedn, cidr, self.sitename)
except subnets.SubnetInvalid:
- print >> sys.stderr, "%s fails properly" % (cidr,)
+ print("%s fails properly" % (cidr,), file=sys.stderr)
continue
# we are here because it succeeded when it shouldn't have.
- print >> sys.stderr, "CIDR %s fails to fail" % (cidr,)
+ print("CIDR %s fails to fail" % (cidr,), file=sys.stderr)
failures.append(cidr)
subnets.delete_subnet(self.ldb, basedn, cidr)
if failures:
- print "These bad subnet names were accepted:"
+ print("These bad subnet names were accepted:")
for cidr in failures:
- print " %s" % cidr
+ print(" %s" % cidr)
self.fail()
def test_create_good_ranges(self):
try:
subnets.create_subnet(self.ldb, basedn, cidr, self.sitename)
except subnets.SubnetInvalid as e:
- print e
+ print(e)
failures.append(cidr)
continue
cidr))
if len(ret) != 1:
- print "%s was not created" % cidr
+ print("%s was not created" % cidr)
failures.append(cidr)
continue
subnets.delete_subnet(self.ldb, basedn, cidr)
if failures:
- print "These good subnet names were not accepted:"
+ print("These good subnet names were not accepted:")
for cidr in failures:
- print " %s" % cidr
+ print(" %s" % cidr)
self.fail()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Originally based on ./sam.py
+from __future__ import print_function
from unicodedata import normalize
import locale
locale.setlocale(locale.LC_ALL, ('en_US', 'UTF-8'))
try:
self.ldb.delete(self.ou, ['tree_delete:1'])
except ldb.LdbError as e:
- print "tried deleting %s, got error %s" % (self.ou, e)
+ print("tried deleting %s, got error %s" % (self.ou, e))
self.ldb.add({
"dn": self.ou,
expected_order = self.expected_results[attr][rev]
received_order = [norm(x[attr][0]) for x in res]
if expected_order != received_order:
- print attr, ['forward', 'reverse'][rev]
- print "expected", expected_order
- print "recieved", received_order
- print "unnormalised:", [x[attr][0] for x in res]
- print "unnormalised: «%s»" % '» «'.join(x[attr][0]
- for x in res)
+ print(attr, ['forward', 'reverse'][rev])
+ print("expected", expected_order)
+ print("recieved", received_order)
+ print("unnormalised:", [x[attr][0] for x in res])
+ print("unnormalised: «%s»" % '» «'.join(x[attr][0]
+ for x in res))
self.assertEquals(expected_order, received_order)
def _test_server_sort_binary(self):
expected_order = self.expected_results_binary[attr][rev]
received_order = [x[attr][0] for x in res]
if expected_order != received_order:
- print attr
- print expected_order
- print received_order
+ print(attr)
+ print(expected_order)
+ print(received_order)
self.assertEquals(expected_order, received_order)
def _test_server_sort_us_english(self):
expected_order = self.expected_results[attr][rev]
received_order = [norm(x[attr][0]) for x in res]
if expected_order != received_order:
- print attr, lang
- print ['forward', 'reverse'][rev]
- print "expected: ", expected_order
- print "recieved: ", received_order
- print "unnormalised:", [x[attr][0] for x in res]
- print "unnormalised: «%s»" % '» «'.join(x[attr][0]
- for x in res)
+ print(attr, lang)
+ print(['forward', 'reverse'][rev])
+ print("expected: ", expected_order)
+ print("recieved: ", received_order)
+ print("unnormalised:", [x[attr][0] for x in res])
+ print("unnormalised: «%s»" % '» «'.join(x[attr][0]
+ for x in res))
self.assertEquals(expected_order, received_order)
received_order = [norm(x[result_attr][0]) for x in res]
if expected_order != received_order:
- print sort_attr, result_attr, ['forward', 'reverse'][rev]
- print "expected", expected_order
- print "recieved", received_order
- print "unnormalised:", [x[result_attr][0] for x in res]
- print "unnormalised: «%s»" % '» «'.join(x[result_attr][0]
- for x in res)
- print "pairs:", pairs
+ print(sort_attr, result_attr, ['forward', 'reverse'][rev])
+ print("expected", expected_order)
+ print("recieved", received_order)
+ print("unnormalised:", [x[result_attr][0] for x in res])
+ print("unnormalised: «%s»" % '» «'.join(x[result_attr][0]
+ for x in res))
+ print("pairs:", pairs)
# There are bugs in Windows that we don't want (or
# know how) to replicate regarding timestamp sorting.
# Let's remind ourselves.
if result_attr == "msTSExpireDate4":
- print '-' * 72
+ print('-' * 72)
print ("This test fails against Windows with the "
"default number of elements (33).")
- print "Try with --elements=27 (or similar)."
- print '-' * 72
+ print("Try with --elements=27 (or similar).")
+ print('-' * 72)
self.assertEquals(expected_order, received_order)
for x in res:
# -*- coding: utf-8 -*-
# test tokengroups attribute against internal token calculation
+from __future__ import print_function
import optparse
import sys
import os
# Run the actual call loop.
while client_finished == False and server_finished == False:
if not client_finished:
- print "running client gensec_update"
+ print("running client gensec_update")
(client_finished, client_to_server) = gensec_client.update(server_to_client)
if not server_finished:
- print "running server gensec_update"
+ print("running server gensec_update")
(server_finished, server_to_client) = gensec_server.update(client_to_server)
session = gensec_server.session_info()
# Run the actual call loop.
while client_finished == False and server_finished == False:
if not client_finished:
- print "running client gensec_update"
+ print("running client gensec_update")
(client_finished, client_to_server) = gensec_client.update(server_to_client)
if not server_finished:
- print "running server gensec_update"
+ print("running server gensec_update")
(server_finished, server_to_client) = gensec_server.update(client_to_server)
session = gensec_server.session_info()
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from __future__ import print_function
import sys
import unittest
self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
def test_undelete(self):
- print "Testing standard undelete operation"
+ print("Testing standard undelete operation")
usr1 = "cn=testuser,cn=users," + self.base_dn
samba.tests.delete_force(self.samdb, usr1)
self.samdb.add({
samba.tests.delete_force(self.samdb, usr1)
def test_rename(self):
- print "Testing attempt to rename deleted object"
+ print("Testing attempt to rename deleted object")
usr1 = "cn=testuser,cn=users," + self.base_dn
self.samdb.add({
"dn": usr1,
self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
def test_undelete_with_mod(self):
- print "Testing standard undelete operation with modification of additional attributes"
+ print("Testing standard undelete operation with modification of additional attributes")
usr1 = "cn=testuser,cn=users," + self.base_dn
self.samdb.add({
"dn": usr1,
samba.tests.delete_force(self.samdb, usr1)
def test_undelete_newuser(self):
- print "Testing undelete user with a different dn"
+ print("Testing undelete user with a different dn")
usr1 = "cn=testuser,cn=users," + self.base_dn
usr2 = "cn=testuser2,cn=users," + self.base_dn
samba.tests.delete_force(self.samdb, usr1)
samba.tests.delete_force(self.samdb, usr2)
def test_undelete_existing(self):
- print "Testing undelete user after a user with the same dn has been created"
+ print("Testing undelete user after a user with the same dn has been created")
usr1 = "cn=testuser,cn=users," + self.base_dn
self.samdb.add({
"dn": usr1,
self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
def test_undelete_cross_nc(self):
- print "Cross NC undelete"
+ print("Cross NC undelete")
c1 = "cn=ldaptestcontainer," + self.base_dn
c2 = "cn=ldaptestcontainer2," + self.configuration_dn
c3 = "cn=ldaptestcontainer," + self.configuration_dn
(DRSUAPI_ATTID_isRecycled, 2)]
def test_restore_user(self):
- print "Test restored user attributes"
+ print("Test restored user attributes")
username = "restore_user"
usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
samba.tests.delete_force(self.samdb, usr_dn)
(DRSUAPI_ATTID_isRecycled, 2)]
def test_restorepw_user(self):
- print "Test restored user attributes"
+ print("Test restored user attributes")
username = "restorepw_user"
usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
samba.tests.delete_force(self.samdb, usr_dn)
'cn': groupname }
def test_plain_group(self):
- print "Test restored Group attributes"
+ print("Test restored Group attributes")
# create test group
obj = self._create_test_group("r_group")
guid = obj["objectGUID"][0]
self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
def test_group_with_members(self):
- print "Test restored Group with members attributes"
+ print("Test restored Group with members attributes")
# create test group
usr1 = self._create_test_user("r_user_1")
usr2 = self._create_test_user("r_user_2")
return self.search_dn(ou_dn)
def test_ou_with_name_description(self):
- print "Test OU reanimation"
+ print("Test OU reanimation")
# create OU to test with
obj = self._create_test_ou(rdn="r_ou",
name="r_ou name",
self.assertAttributesExists(expected_attrs, obj_restore)
def test_container(self):
- print "Test Container reanimation"
+ print("Test Container reanimation")
# create test Container
obj = self._create_object({
"dn": "CN=r_container,CN=Users,%s" % self.base_dn,
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+from __future__ import print_function
import optparse
import sys
sys.path.insert(0, "bin/python")
self.ldb = samba.tests.connect_samdb(host, global_schema=False)
self.base_dn = self.ldb.domain_dn()
- print "baseDN: %s\n" % self.base_dn
+ print("baseDN: %s\n" % self.base_dn)
def test_nonurgent_object(self):
"""Test if the urgent replication is not activated when handling a non urgent object."""
self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
except LdbError:
- print "Not testing urgent replication when creating classSchema object ...\n"
+ print("Not testing urgent replication when creating classSchema object ...\n")
# urgent replication should be enabled when modifying
m = Message()
# Licenced under the GPLv3
#
+from __future__ import print_function
import optparse
import sys
import unittest
msg = ldb.Message.from_dict(self.samdb, msg_dict )
msg["sAMAccountName"] = samaccountname
- print "Adding computer account %s" % computername
+ print("Adding computer account %s" % computername)
samdb.add(msg)
def get_creds(self, target_username, target_password):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Originally based on ./sam.py
+from __future__ import print_function
import optparse
import sys
import os
try:
self.ldb.delete(self.ou, ['tree_delete:1'])
except ldb.LdbError as e:
- print "tried deleting %s, got error %s" % (self.ou, e)
+ print("tried deleting %s, got error %s" % (self.ou, e))
self.ldb.add({
"dn": self.ou,
"objectclass": "organizationalUnit"})
gte_map[k] = len(expected_order)
if False:
- print "gte_map:"
+ print("gte_map:")
for k in gte_order:
- print " %10s => %10s" % (k, gte_map[k])
+ print(" %10s => %10s" % (k, gte_map[k]))
return gte_order, expected_order, gte_map
return
if expected_order is not None:
- print "expected order: %s" % expected_order[:20]
+ print("expected order: %s" % expected_order[:20])
if len(expected_order) > 20:
- print "... and %d more not shown" % (len(expected_order) - 20)
+ print("... and %d more not shown" % (len(expected_order) - 20))
- print "offset %d before %d after %d" % (offset, before, after)
- print "start %d end %d" % (start, end)
- print "expected: %s" % expected_results
- print "got : %s" % results
+ print("offset %d before %d after %d" % (offset, before, after))
+ print("start %d end %d" % (start, end))
+ print("expected: %s" % expected_results)
+ print("got : %s" % results)
self.assertEquals(expected_results, results)
def test_server_vlv_with_cookie(self):
iteration += 1
if expected_results != results:
middle = expected_order[len(expected_order) // 2]
- print expected_results, results
- print middle
- print expected_order
- print
+ print(expected_results, results)
+ print(middle)
+ print(expected_order)
+ print()
print ("\nattr %s offset %d before %d "
"after %d gte %s" %
(attr, offset, before, after, gte))