2 # subunit: extensions to python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """Tests for subunit.TestResultFilter."""
19 from datetime import datetime
23 from subunit import iso8601
26 from testtools import TestCase
27 from testtools.compat import _b, BytesIO
28 from testtools.testresult.doubles import ExtendedTestResult
31 from subunit.test_results import make_tag_filter, TestResultFilter
34 class TestTestResultFilter(TestCase):
35 """Test for TestResultFilter, a TestResult object which filters tests."""
37 # While TestResultFilter works on python objects, using a subunit stream
38 # is an easy pithy way of getting a series of test objects to call into
39 # the TestResult, and as TestResultFilter is intended for use with subunit
40 # also has the benefit of detecting any interface skew issues.
41 example_subunit_stream = _b("""\
58 def run_tests(self, result_filter, input_stream=None):
59 """Run tests through the given filter.
61 :param result_filter: A filtering TestResult object.
62 :param input_stream: Bytes of subunit stream data. If not provided,
63 uses TestTestResultFilter.example_subunit_stream.
65 if input_stream is None:
66 input_stream = self.example_subunit_stream
67 test = subunit.ProtocolTestCase(BytesIO(input_stream))
68 test.run(result_filter)
70 def test_default(self):
71 """The default is to exclude success and include everything else."""
72 filtered_result = unittest.TestResult()
73 result_filter = TestResultFilter(filtered_result)
74 self.run_tests(result_filter)
75 # skips are seen as success by default python TestResult.
76 self.assertEqual(['error'],
77 [error[0].id() for error in filtered_result.errors])
78 self.assertEqual(['failed'],
79 [failure[0].id() for failure in
80 filtered_result.failures])
81 self.assertEqual(4, filtered_result.testsRun)
83 def test_tag_filter(self):
84 tag_filter = make_tag_filter(['global'], ['local'])
85 result = ExtendedTestResult()
86 result_filter = TestResultFilter(
87 result, filter_success=False, filter_predicate=tag_filter)
88 self.run_tests(result_filter)
90 event[1] for event in result._events if event[0] == 'startTest']
91 tests_expected = list(map(
92 subunit.RemotedTestCase,
93 ['passed', 'error', 'skipped', 'todo']))
94 self.assertEquals(tests_expected, tests_included)
96 def test_tags_tracked_correctly(self):
97 tag_filter = make_tag_filter(['a'], [])
98 result = ExtendedTestResult()
99 result_filter = TestResultFilter(
100 result, filter_success=False, filter_predicate=tag_filter)
107 self.run_tests(result_filter, input_stream)
108 foo = subunit.RemotedTestCase('foo')
111 ('tags', set(['a']), set()),
117 def test_exclude_errors(self):
118 filtered_result = unittest.TestResult()
119 result_filter = TestResultFilter(filtered_result, filter_error=True)
120 self.run_tests(result_filter)
121 # skips are seen as errors by default python TestResult.
122 self.assertEqual([], filtered_result.errors)
123 self.assertEqual(['failed'],
124 [failure[0].id() for failure in
125 filtered_result.failures])
126 self.assertEqual(3, filtered_result.testsRun)
128 def test_fixup_expected_failures(self):
129 filtered_result = unittest.TestResult()
130 result_filter = TestResultFilter(filtered_result,
131 fixup_expected_failures=set(["failed"]))
132 self.run_tests(result_filter)
133 self.assertEqual(['failed', 'todo'],
134 [failure[0].id() for failure in filtered_result.expectedFailures])
135 self.assertEqual([], filtered_result.failures)
136 self.assertEqual(4, filtered_result.testsRun)
138 def test_fixup_expected_errors(self):
139 filtered_result = unittest.TestResult()
140 result_filter = TestResultFilter(filtered_result,
141 fixup_expected_failures=set(["error"]))
142 self.run_tests(result_filter)
143 self.assertEqual(['error', 'todo'],
144 [failure[0].id() for failure in filtered_result.expectedFailures])
145 self.assertEqual([], filtered_result.errors)
146 self.assertEqual(4, filtered_result.testsRun)
148 def test_fixup_unexpected_success(self):
149 filtered_result = unittest.TestResult()
150 result_filter = TestResultFilter(filtered_result, filter_success=False,
151 fixup_expected_failures=set(["passed"]))
152 self.run_tests(result_filter)
153 self.assertEqual(['passed'],
154 [passed.id() for passed in filtered_result.unexpectedSuccesses])
155 self.assertEqual(5, filtered_result.testsRun)
157 def test_exclude_failure(self):
158 filtered_result = unittest.TestResult()
159 result_filter = TestResultFilter(filtered_result, filter_failure=True)
160 self.run_tests(result_filter)
161 self.assertEqual(['error'],
162 [error[0].id() for error in filtered_result.errors])
164 [failure[0].id() for failure in
165 filtered_result.failures])
166 self.assertEqual(3, filtered_result.testsRun)
168 def test_exclude_skips(self):
169 filtered_result = subunit.TestResultStats(None)
170 result_filter = TestResultFilter(filtered_result, filter_skip=True)
171 self.run_tests(result_filter)
172 self.assertEqual(0, filtered_result.skipped_tests)
173 self.assertEqual(2, filtered_result.failed_tests)
174 self.assertEqual(3, filtered_result.testsRun)
176 def test_include_success(self):
177 """Successes can be included if requested."""
178 filtered_result = unittest.TestResult()
179 result_filter = TestResultFilter(filtered_result,
180 filter_success=False)
181 self.run_tests(result_filter)
182 self.assertEqual(['error'],
183 [error[0].id() for error in filtered_result.errors])
184 self.assertEqual(['failed'],
185 [failure[0].id() for failure in
186 filtered_result.failures])
187 self.assertEqual(5, filtered_result.testsRun)
189 def test_filter_predicate(self):
190 """You can filter by predicate callbacks"""
191 # 0.0.7 and earlier did not support the 'tags' parameter, so we need
192 # to test that we still support behaviour without it.
193 filtered_result = unittest.TestResult()
194 def filter_cb(test, outcome, err, details):
195 return outcome == 'success'
196 result_filter = TestResultFilter(filtered_result,
197 filter_predicate=filter_cb,
198 filter_success=False)
199 self.run_tests(result_filter)
200 # Only success should pass
201 self.assertEqual(1, filtered_result.testsRun)
203 def test_filter_predicate_with_tags(self):
204 """You can filter by predicate callbacks that accept tags"""
205 filtered_result = unittest.TestResult()
206 def filter_cb(test, outcome, err, details, tags):
207 return outcome == 'success'
208 result_filter = TestResultFilter(filtered_result,
209 filter_predicate=filter_cb,
210 filter_success=False)
211 self.run_tests(result_filter)
212 # Only success should pass
213 self.assertEqual(1, filtered_result.testsRun)
215 def test_time_ordering_preserved(self):
216 # Passing a subunit stream through TestResultFilter preserves the
217 # relative ordering of 'time' directives and any other subunit
218 # directives that are still included.
219 date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
220 date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
221 date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
222 subunit_stream = _b('\n'.join([
228 ""]) % (date_a, date_b, date_c))
229 result = ExtendedTestResult()
230 result_filter = TestResultFilter(result)
231 self.run_tests(result_filter, subunit_stream)
232 foo = subunit.RemotedTestCase('foo')
238 ('addError', foo, {}),
240 ('time', date_c)], result._events)
242 def test_time_passes_through_filtered_tests(self):
243 # Passing a subunit stream through TestResultFilter preserves 'time'
244 # directives even if a specific test is filtered out.
245 date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
246 date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
247 date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
248 subunit_stream = _b('\n'.join([
254 ""]) % (date_a, date_b, date_c))
255 result = ExtendedTestResult()
256 result_filter = TestResultFilter(result)
257 result_filter.startTestRun()
258 self.run_tests(result_filter, subunit_stream)
259 result_filter.stopTestRun()
260 foo = subunit.RemotedTestCase('foo')
266 ('stopTestRun',),], result._events)
268 def test_skip_preserved(self):
269 subunit_stream = _b('\n'.join([
273 result = ExtendedTestResult()
274 result_filter = TestResultFilter(result)
275 self.run_tests(result_filter, subunit_stream)
276 foo = subunit.RemotedTestCase('foo')
279 ('addSkip', foo, {}),
280 ('stopTest', foo), ], result._events)
282 if sys.version_info < (2, 7):
283 # These tests require Python >=2.7.
284 del test_fixup_expected_failures, test_fixup_expected_errors, test_fixup_unexpected_success
287 class TestFilterCommand(TestCase):
289 example_subunit_stream = _b("""\
306 def run_command(self, args, stream):
307 root = os.path.dirname(
308 os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
309 script_path = os.path.join(root, 'filters', 'subunit-filter')
310 command = [sys.executable, script_path] + list(args)
311 ps = subprocess.Popen(
312 command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
313 stderr=subprocess.PIPE)
314 out, err = ps.communicate(stream)
315 if ps.returncode != 0:
316 raise RuntimeError("%s failed: %s" % (command, err))
319 def to_events(self, stream):
320 test = subunit.ProtocolTestCase(BytesIO(stream))
321 result = ExtendedTestResult()
323 return result._events
325 def test_default(self):
326 output = self.run_command([], _b(
330 events = self.to_events(output)
331 foo = subunit.RemotedTestCase('foo')
334 ('addSkip', foo, {}),
339 output = self.run_command(['-s', '--with-tag', 'a'], _b(
350 events = self.to_events(output)
351 foo = subunit.RemotedTestCase('foo')
352 baz = subunit.RemotedTestCase('baz')
354 [('tags', set(['a']), set()),
358 ('tags', set(), set(['a'])),
360 ('tags', set(['a']), set()),
368 loader = subunit.tests.TestUtil.TestLoader()
369 result = loader.loadTestsFromName(__name__)