selftest.run: Factor out read_testlist_file and open_file_or_pipe.
authorJelmer Vernooij <jelmer@samba.org>
Mon, 5 Mar 2012 03:05:35 +0000 (04:05 +0100)
committerJelmer Vernooij <jelmer@samba.org>
Mon, 5 Mar 2012 04:42:19 +0000 (05:42 +0100)
Autobuild-User: Jelmer Vernooij <jelmer@samba.org>
Autobuild-Date: Mon Mar  5 05:42:19 CET 2012 on sn-devel-104

selftest/selftest.py
selftest/testlist.py
selftest/tests/test_testlist.py

index 9ca1e45c1df1466e2c401d15a55b66f95d88e544..26c409af453d4a409ce26c58da839320f0130f9d 100755 (executable)
@@ -344,25 +344,16 @@ else:
 os.environ["SELFTEST_MAXTIME"] = str(torture_maxtime)
 
 
 os.environ["SELFTEST_MAXTIME"] = str(torture_maxtime)
 
 
-def open_file_or_pipe(path, mode):
-    if path.endswith("|"):
-        return os.popen(path[:-1], mode)
-    return open(path, mode)
-
 available = []
 for fn in opts.testlist:
 available = []
 for fn in opts.testlist:
-    inf = open_file_or_pipe(fn, 'r')
-    try:
-        for testsuite in testlist.read_testlist(inf, sys.stdout):
-            if not testlist.should_run_test(tests, testsuite):
-                continue
-            name = testsuite[0]
-            if (includes is not None and
-                testlist.find_in_list(includes, name) is not None):
-                continue
-            available.append(testsuite)
-    finally:
-        inf.close()
+    for testsuite in testlist.read_testlist_file(fn):
+        if not testlist.should_run_test(tests, testsuite):
+            continue
+        name = testsuite[0]
+        if (includes is not None and
+            testlist.find_in_list(includes, name) is not None):
+            continue
+        available.append(testsuite)
 
 if opts.load_list:
     restricted_mgr = testlist.RestrictedTestManager.from_path(opts.load_list)
 
 if opts.load_list:
     restricted_mgr = testlist.RestrictedTestManager.from_path(opts.load_list)
index 441dda7cbb102e216374ba5ab00c6a77dab2ff92..5102f4288bd84a534e60e4af679c4437cc360b7b 100644 (file)
@@ -21,7 +21,9 @@
 
 __all__ = ['find_in_list', 'read_test_regexes', 'read_testlist']
 
 
 __all__ = ['find_in_list', 'read_test_regexes', 'read_testlist']
 
+import os
 import re
 import re
+import sys
 
 def find_in_list(list, fullname):
     """Find test in list.
 
 def find_in_list(list, fullname):
     """Find test in list.
@@ -133,3 +135,34 @@ class RestrictedTestManager(object):
         :return: Iterator over test list entries that were not used.
         """
         return iter(self.unused)
         :return: Iterator over test list entries that were not used.
         """
         return iter(self.unused)
+
+
+def open_file_or_pipe(path, mode):
+    """Open a file or pipe.
+
+    :param path: Path to open; if it ends with | it is assumed to be a
+        command to run
+    :param mode: Mode with which to open it
+    :return: File-like object
+    """
+    if path.endswith("|"):
+        return os.popen(path[:-1], mode)
+    return open(path, mode)
+
+
+def read_testlist_file(fn, outf=None):
+    """Read testlist file.
+
+    :param fn: Path to read (assumed to be a command to run if it ends with |)
+    :param outf: File-like object to pass non-test data through to
+        (defaults to stdout)
+    :return: Iterator over test suites (see read_testlist)
+    """
+    if outf is None:
+        outf = sys.stdout
+    inf = open_file_or_pipe(fn, 'r')
+    try:
+        for testsuite in read_testlist(inf, outf):
+            yield testsuite
+    finally:
+        inf.close()
index cd44d46238159cd696d2f1314b352c922e9c9499..4474d0aa07a097bfee8523ed8dc789f9d181d258 100644 (file)
 
 """Tests for selftest.testlist."""
 
 
 """Tests for selftest.testlist."""
 
+import os
+import tempfile
+
 from selftest.tests import TestCase
 
 from selftest.testlist import (
     RestrictedTestManager,
     find_in_list,
 from selftest.tests import TestCase
 
 from selftest.testlist import (
     RestrictedTestManager,
     find_in_list,
+    open_file_or_pipe,
     read_test_regexes,
     read_testlist,
     read_test_regexes,
     read_testlist,
+    read_testlist_file,
     )
 
 from cStringIO import StringIO
     )
 
 from cStringIO import StringIO
@@ -100,3 +105,44 @@ class RestrictedTestManagerTests(TestCase):
     def test_run_nomatch(self):
         mgr = RestrictedTestManager(["foo.bar"])
         self.assertEquals([], mgr.should_run_testsuite("foo.blie.bla"))
     def test_run_nomatch(self):
         mgr = RestrictedTestManager(["foo.bar"])
         self.assertEquals([], mgr.should_run_testsuite("foo.blie.bla"))
+
+
+class OpenFileOrPipeTests(TestCase):
+
+    def test_regular_file(self):
+        (fd, p) = tempfile.mkstemp()
+        self.addCleanup(os.remove, p)
+        f = os.fdopen(fd, 'w')
+        try:
+            f.write('data\nbla\n')
+        finally:
+            f.close()
+        f = open_file_or_pipe(p, 'r')
+        try:
+            self.assertEquals("data\nbla\n", f.read())
+        finally:
+            f.close()
+
+    def test_pipe(self):
+        f = open_file_or_pipe('echo foo|', 'r')
+        try:
+            self.assertEquals("foo\n", f.read())
+        finally:
+            f.close()
+
+
+class ReadTestListFileTests(TestCase):
+
+    def test_regular_file(self):
+        (fd, p) = tempfile.mkstemp()
+        self.addCleanup(os.remove, p)
+        f = os.fdopen(fd, 'w')
+        try:
+            f.write('noise\n-- TEST --\ndata\nenv\ncmd\n')
+        finally:
+            f.close()
+        outf = StringIO()
+        self.assertEquals(
+            [('data', 'env', 'cmd', False, False)],
+            list(read_testlist_file(p, outf)))
+        self.assertEquals("noise\n", outf.getvalue())