# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
import optparse
import sys
import time
sys.path.insert(0, "bin/python")
import samba
-samba.ensure_external_module("testtools", "testtools")
-samba.ensure_external_module("subunit", "subunit/python")
+from samba.tests.subunitrun import TestProgram, SubunitOptions
import samba.getopt as options
from samba.credentials import Credentials
import samba.tests
from samba.tests import delete_force
-from subunit.run import SubunitTestRunner
-import unittest
parser = optparse.OptionParser("speedtest.py [options] <host>")
sambaopts = options.SambaOptions(parser)
parser.add_option_group(sambaopts)
parser.add_option_group(options.VersionOptions(parser))
-
# use command line creds if available
credopts = options.CredentialsOptions(parser)
parser.add_option_group(credopts)
+subunitopts = SubunitOptions(parser)
+parser.add_option_group(subunitopts)
opts, args = parser.parse_args()
if len(args) < 1:
# Tests start here
#
+
class SpeedTest(samba.tests.TestCase):
def find_domain_sid(self, ldb):
res = ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
- return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
+ return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
def setUp(self):
super(SpeedTest, self).setUp()
self.base_dn = ldb.domain_dn()
self.domain_sid = security.dom_sid(ldb.get_domain_sid())
self.user_pass = "samba123@"
- print "baseDN: %s" % self.base_dn
+ print("baseDN: %s" % self.base_dn)
def create_user(self, user_dn):
ldif = """
dn: """ + user_dn + """
sAMAccountName: """ + user_dn.split(",")[0][3:] + """
objectClass: user
-unicodePwd:: """ + base64.b64encode(("\"%s\"" % self.user_pass).encode('utf-16-le')) + """
+unicodePwd:: """ + base64.b64encode(("\"%s\"" % self.user_pass).encode('utf-16-le')).decode('utf8') + """
url: www.example.com
"""
self.ldb_admin.add_ldif(ldif)
def create_bundle(self, count):
for i in range(count):
- self.create_user("cn=speedtestuser%d,cn=Users,%s" % (i+1, self.base_dn))
+ self.create_user("cn=speedtestuser%d,cn=Users,%s" % (i + 1, self.base_dn))
def remove_bundle(self, count):
for i in range(count):
- delete_force(self.ldb_admin, "cn=speedtestuser%d,cn=Users,%s" % (i+1, self.base_dn))
+ delete_force(self.ldb_admin, "cn=speedtestuser%d,cn=Users,%s" % (i + 1, self.base_dn))
def remove_test_users(self):
res = ldb.search(base="cn=Users,%s" % self.base_dn, expression="(objectClass=user)", scope=SCOPE_SUBTREE)
for dn in dn_list:
delete_force(self.ldb_admin, dn)
+
class SpeedTestAddDel(SpeedTest):
def setUp(self):
super(SpeedTestAddDel, self).setUp()
def run_bundle(self, num):
- print "\n=== Test ADD/DEL %s user objects ===\n" % num
+ print("\n=== Test ADD/DEL %s user objects ===\n" % num)
avg_add = Decimal("0.0")
avg_del = Decimal("0.0")
for x in [1, 2, 3]:
start = time.time()
self.create_bundle(num)
- res_add = Decimal( str(time.time() - start) )
+ res_add = Decimal(str(time.time() - start))
avg_add += res_add
- print " Attempt %s ADD: %.3fs" % ( x, float(res_add) )
+ print(" Attempt %s ADD: %.3fs" % (x, float(res_add)))
#
start = time.time()
self.remove_bundle(num)
- res_del = Decimal( str(time.time() - start) )
+ res_del = Decimal(str(time.time() - start))
avg_del += res_del
- print " Attempt %s DEL: %.3fs" % ( x, float(res_del) )
- print "Average ADD: %.3fs" % float( Decimal(avg_add) / Decimal("3.0") )
- print "Average DEL: %.3fs" % float( Decimal(avg_del) / Decimal("3.0") )
- print ""
+ print(" Attempt %s DEL: %.3fs" % (x, float(res_del)))
+ print("Average ADD: %.3fs" % float(Decimal(avg_add) / Decimal("3.0")))
+ print("Average DEL: %.3fs" % float(Decimal(avg_del) / Decimal("3.0")))
+ print("")
def test_00000(self):
""" Remove possibly undeleted test users from previous test
"""
self.run_bundle(10000)
+
class AclSearchSpeedTest(SpeedTest):
def setUp(self):
delete_force(self.ldb_admin, self.get_user_dn("acltestuser"))
def run_search_bundle(self, num, _ldb):
- print "\n=== Creating %s user objects ===\n" % num
+ print("\n=== Creating %s user objects ===\n" % num)
self.create_bundle(num)
mod = "(A;;LC;;;%s)(D;;RP;;;%s)" % (str(self.user_sid), str(self.user_sid))
for i in range(num):
self.sd_utils.dacl_add_ace("cn=speedtestuser%d,cn=Users,%s" %
- (i+1, self.base_dn), mod)
- print "\n=== %s user objects created ===\n" % num
- print "\n=== Test search on %s user objects ===\n" % num
+ (i + 1, self.base_dn), mod)
+ print("\n=== %s user objects created ===\n" % num)
+ print("\n=== Test search on %s user objects ===\n" % num)
avg_search = Decimal("0.0")
for x in [1, 2, 3]:
start = time.time()
res = _ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
- res_search = Decimal( str(time.time() - start) )
+ res_search = Decimal(str(time.time() - start))
avg_search += res_search
- print " Attempt %s SEARCH: %.3fs" % ( x, float(res_search) )
- print "Average Search: %.3fs" % float( Decimal(avg_search) / Decimal("3.0") )
+ print(" Attempt %s SEARCH: %.3fs" % (x, float(res_search)))
+ print("Average Search: %.3fs" % float(Decimal(avg_search) / Decimal("3.0")))
self.remove_bundle(num)
def get_user_dn(self, name):
# Important unit running information
-if not "://" in host:
+
+if "://" not in host:
host = "ldap://%s" % host
ldb_options = ["modules:paged_searches"]
ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp, options=ldb_options)
-runner = SubunitTestRunner()
-suite = unittest.TestSuite()
-suite.addTests(unittest.makeSuite(SpeedTestAddDel))
-suite.addTests(unittest.makeSuite(AclSearchSpeedTest))
-if not runner.run(suite).wasSuccessful():
- rc = 1
-else:
- rc = 0
-sys.exit(rc)
+TestProgram(module=__name__, opts=subunitopts)