PEP8: fix E713: test for membership should be 'not in'
[nivanova/samba-autobuild/.git] / source4 / scripting / devel / speedtest.py
index 84d3760c0506eb93971eb1c805227bfc64e5d6a8..e8d412436e63c865b2cd07014adc2fb65dbe2ba8 100755 (executable)
@@ -22,6 +22,7 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 
+from __future__ import print_function
 import optparse
 import sys
 import time
@@ -30,8 +31,7 @@ from decimal import Decimal
 
 sys.path.insert(0, "bin/python")
 import samba
-samba.ensure_external_module("testtools", "testtools")
-samba.ensure_external_module("subunit", "subunit/python")
+from samba.tests.subunitrun import TestProgram, SubunitOptions
 
 import samba.getopt as options
 
@@ -45,18 +45,17 @@ from samba.samdb import SamDB
 from samba.credentials import Credentials
 import samba.tests
 from samba.tests import delete_force
-from subunit.run import SubunitTestRunner
-import unittest
 
 parser = optparse.OptionParser("speedtest.py [options] <host>")
 sambaopts = options.SambaOptions(parser)
 parser.add_option_group(sambaopts)
 parser.add_option_group(options.VersionOptions(parser))
 
-
 # use command line creds if available
 credopts = options.CredentialsOptions(parser)
 parser.add_option_group(credopts)
+subunitopts = SubunitOptions(parser)
+parser.add_option_group(subunitopts)
 opts, args = parser.parse_args()
 
 if len(args) < 1:
@@ -73,11 +72,12 @@ creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
 # Tests start here
 #
 
+
 class SpeedTest(samba.tests.TestCase):
 
     def find_domain_sid(self, ldb):
         res = ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
-        return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
+        return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
 
     def setUp(self):
         super(SpeedTest, self).setUp()
@@ -85,14 +85,14 @@ class SpeedTest(samba.tests.TestCase):
         self.base_dn = ldb.domain_dn()
         self.domain_sid = security.dom_sid(ldb.get_domain_sid())
         self.user_pass = "samba123@"
-        print "baseDN: %s" % self.base_dn
+        print("baseDN: %s" % self.base_dn)
 
     def create_user(self, user_dn):
         ldif = """
 dn: """ + user_dn + """
 sAMAccountName: """ + user_dn.split(",")[0][3:] + """
 objectClass: user
-unicodePwd:: """ + base64.b64encode(("\"%s\"" % self.user_pass).encode('utf-16-le')) + """
+unicodePwd:: """ + base64.b64encode(("\"%s\"" % self.user_pass).encode('utf-16-le')).decode('utf8') + """
 url: www.example.com
 """
         self.ldb_admin.add_ldif(ldif)
@@ -109,11 +109,11 @@ url: www.example.com
 
     def create_bundle(self, count):
         for i in range(count):
-            self.create_user("cn=speedtestuser%d,cn=Users,%s" % (i+1, self.base_dn))
+            self.create_user("cn=speedtestuser%d,cn=Users,%s" % (i + 1, self.base_dn))
 
     def remove_bundle(self, count):
         for i in range(count):
-            delete_force(self.ldb_admin, "cn=speedtestuser%d,cn=Users,%s" % (i+1, self.base_dn))
+            delete_force(self.ldb_admin, "cn=speedtestuser%d,cn=Users,%s" % (i + 1, self.base_dn))
 
     def remove_test_users(self):
         res = ldb.search(base="cn=Users,%s" % self.base_dn, expression="(objectClass=user)", scope=SCOPE_SUBTREE)
@@ -121,30 +121,31 @@ url: www.example.com
         for dn in dn_list:
             delete_force(self.ldb_admin, dn)
 
+
 class SpeedTestAddDel(SpeedTest):
 
     def setUp(self):
         super(SpeedTestAddDel, self).setUp()
 
     def run_bundle(self, num):
-        print "\n=== Test ADD/DEL %s user objects ===\n" % num
+        print("\n=== Test ADD/DEL %s user objects ===\n" % num)
         avg_add = Decimal("0.0")
         avg_del = Decimal("0.0")
         for x in [1, 2, 3]:
             start = time.time()
             self.create_bundle(num)
-            res_add = Decimal( str(time.time() - start) )
+            res_add = Decimal(str(time.time() - start))
             avg_add += res_add
-            print "   Attempt %s ADD: %.3fs" % ( x, float(res_add) )
+            print("   Attempt %s ADD: %.3fs" % (x, float(res_add)))
             #
             start = time.time()
             self.remove_bundle(num)
-            res_del = Decimal( str(time.time() - start) )
+            res_del = Decimal(str(time.time() - start))
             avg_del += res_del
-            print "   Attempt %s DEL: %.3fs" % ( x, float(res_del) )
-        print "Average ADD: %.3fs" % float( Decimal(avg_add) / Decimal("3.0") )
-        print "Average DEL: %.3fs" % float( Decimal(avg_del) / Decimal("3.0") )
-        print ""
+            print("   Attempt %s DEL: %.3fs" % (x, float(res_del)))
+        print("Average ADD: %.3fs" % float(Decimal(avg_add) / Decimal("3.0")))
+        print("Average DEL: %.3fs" % float(Decimal(avg_del) / Decimal("3.0")))
+        print("")
 
     def test_00000(self):
         """ Remove possibly undeleted test users from previous test
@@ -166,6 +167,7 @@ class SpeedTestAddDel(SpeedTest):
         """
         self.run_bundle(10000)
 
+
 class AclSearchSpeedTest(SpeedTest):
 
     def setUp(self):
@@ -180,22 +182,22 @@ class AclSearchSpeedTest(SpeedTest):
         delete_force(self.ldb_admin, self.get_user_dn("acltestuser"))
 
     def run_search_bundle(self, num, _ldb):
-        print "\n=== Creating %s user objects ===\n" % num
+        print("\n=== Creating %s user objects ===\n" % num)
         self.create_bundle(num)
         mod = "(A;;LC;;;%s)(D;;RP;;;%s)" % (str(self.user_sid), str(self.user_sid))
         for i in range(num):
             self.sd_utils.dacl_add_ace("cn=speedtestuser%d,cn=Users,%s" %
-                                       (i+1, self.base_dn), mod)
-        print "\n=== %s user objects created ===\n" % num
-        print "\n=== Test search on %s user objects ===\n" % num
+                                       (i + 1, self.base_dn), mod)
+        print("\n=== %s user objects created ===\n" % num)
+        print("\n=== Test search on %s user objects ===\n" % num)
         avg_search = Decimal("0.0")
         for x in [1, 2, 3]:
             start = time.time()
             res = _ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
-            res_search = Decimal( str(time.time() - start) )
+            res_search = Decimal(str(time.time() - start))
             avg_search += res_search
-            print "   Attempt %s SEARCH: %.3fs" % ( x, float(res_search) )
-        print "Average Search: %.3fs" % float( Decimal(avg_search) / Decimal("3.0") )
+            print("   Attempt %s SEARCH: %.3fs" % (x, float(res_search)))
+        print("Average Search: %.3fs" % float(Decimal(avg_search) / Decimal("3.0")))
         self.remove_bundle(num)
 
     def get_user_dn(self, name):
@@ -224,18 +226,11 @@ class AclSearchSpeedTest(SpeedTest):
 
 # Important unit running information
 
-if not "://" in host:
+
+if "://" not in host:
     host = "ldap://%s" % host
 
 ldb_options = ["modules:paged_searches"]
 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp, options=ldb_options)
 
-runner = SubunitTestRunner()
-suite = unittest.TestSuite()
-suite.addTests(unittest.makeSuite(SpeedTestAddDel))
-suite.addTests(unittest.makeSuite(AclSearchSpeedTest))
-if not runner.run(suite).wasSuccessful():
-    rc = 1
-else:
-    rc = 0
-sys.exit(rc)
+TestProgram(module=__name__, opts=subunitopts)