Remove unnecessary python path updates for bundled subunit/testtools.
[amitay/samba.git] / lib / subunit / python / subunit / tests / test_subunit_filter.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Tests for subunit.TestResultFilter."""
18
19 from datetime import datetime
20 import os
21 import subprocess
22 import sys
23 from subunit import iso8601
24 import unittest
25
26 from testtools import TestCase
27 from testtools.compat import _b, BytesIO
28 from testtools.testresult.doubles import ExtendedTestResult
29
30 import subunit
31 from subunit.test_results import make_tag_filter, TestResultFilter
32
33
34 class TestTestResultFilter(TestCase):
35     """Test for TestResultFilter, a TestResult object which filters tests."""
36
37     # While TestResultFilter works on python objects, using a subunit stream
38     # is an easy pithy way of getting a series of test objects to call into
39     # the TestResult, and as TestResultFilter is intended for use with subunit
40     # also has the benefit of detecting any interface skew issues.
41     example_subunit_stream = _b("""\
42 tags: global
43 test passed
44 success passed
45 test failed
46 tags: local
47 failure failed
48 test error
49 error error [
50 error details
51 ]
52 test skipped
53 skip skipped
54 test todo
55 xfail todo
56 """)
57
58     def run_tests(self, result_filter, input_stream=None):
59         """Run tests through the given filter.
60
61         :param result_filter: A filtering TestResult object.
62         :param input_stream: Bytes of subunit stream data. If not provided,
63             uses TestTestResultFilter.example_subunit_stream.
64         """
65         if input_stream is None:
66             input_stream = self.example_subunit_stream
67         test = subunit.ProtocolTestCase(BytesIO(input_stream))
68         test.run(result_filter)
69
70     def test_default(self):
71         """The default is to exclude success and include everything else."""
72         filtered_result = unittest.TestResult()
73         result_filter = TestResultFilter(filtered_result)
74         self.run_tests(result_filter)
75         # skips are seen as success by default python TestResult.
76         self.assertEqual(['error'],
77             [error[0].id() for error in filtered_result.errors])
78         self.assertEqual(['failed'],
79             [failure[0].id() for failure in
80             filtered_result.failures])
81         self.assertEqual(4, filtered_result.testsRun)
82
83     def test_tag_filter(self):
84         tag_filter = make_tag_filter(['global'], ['local'])
85         result = ExtendedTestResult()
86         result_filter = TestResultFilter(
87             result, filter_success=False, filter_predicate=tag_filter)
88         self.run_tests(result_filter)
89         tests_included = [
90             event[1] for event in result._events if event[0] == 'startTest']
91         tests_expected = list(map(
92             subunit.RemotedTestCase,
93             ['passed', 'error', 'skipped', 'todo']))
94         self.assertEquals(tests_expected, tests_included)
95
96     def test_tags_tracked_correctly(self):
97         tag_filter = make_tag_filter(['a'], [])
98         result = ExtendedTestResult()
99         result_filter = TestResultFilter(
100             result, filter_success=False, filter_predicate=tag_filter)
101         input_stream = _b(
102             "test: foo\n"
103             "tags: a\n"
104             "successful: foo\n"
105             "test: bar\n"
106             "successful: bar\n")
107         self.run_tests(result_filter, input_stream)
108         foo = subunit.RemotedTestCase('foo')
109         self.assertEquals(
110             [('startTest', foo),
111              ('tags', set(['a']), set()),
112              ('addSuccess', foo),
113              ('stopTest', foo),
114              ],
115             result._events)
116
117     def test_exclude_errors(self):
118         filtered_result = unittest.TestResult()
119         result_filter = TestResultFilter(filtered_result, filter_error=True)
120         self.run_tests(result_filter)
121         # skips are seen as errors by default python TestResult.
122         self.assertEqual([], filtered_result.errors)
123         self.assertEqual(['failed'],
124             [failure[0].id() for failure in
125             filtered_result.failures])
126         self.assertEqual(3, filtered_result.testsRun)
127
128     def test_fixup_expected_failures(self):
129         filtered_result = unittest.TestResult()
130         result_filter = TestResultFilter(filtered_result,
131             fixup_expected_failures=set(["failed"]))
132         self.run_tests(result_filter)
133         self.assertEqual(['failed', 'todo'],
134             [failure[0].id() for failure in filtered_result.expectedFailures])
135         self.assertEqual([], filtered_result.failures)
136         self.assertEqual(4, filtered_result.testsRun)
137
138     def test_fixup_expected_errors(self):
139         filtered_result = unittest.TestResult()
140         result_filter = TestResultFilter(filtered_result,
141             fixup_expected_failures=set(["error"]))
142         self.run_tests(result_filter)
143         self.assertEqual(['error', 'todo'],
144             [failure[0].id() for failure in filtered_result.expectedFailures])
145         self.assertEqual([], filtered_result.errors)
146         self.assertEqual(4, filtered_result.testsRun)
147
148     def test_fixup_unexpected_success(self):
149         filtered_result = unittest.TestResult()
150         result_filter = TestResultFilter(filtered_result, filter_success=False,
151             fixup_expected_failures=set(["passed"]))
152         self.run_tests(result_filter)
153         self.assertEqual(['passed'],
154             [passed.id() for passed in filtered_result.unexpectedSuccesses])
155         self.assertEqual(5, filtered_result.testsRun)
156
157     def test_exclude_failure(self):
158         filtered_result = unittest.TestResult()
159         result_filter = TestResultFilter(filtered_result, filter_failure=True)
160         self.run_tests(result_filter)
161         self.assertEqual(['error'],
162             [error[0].id() for error in filtered_result.errors])
163         self.assertEqual([],
164             [failure[0].id() for failure in
165             filtered_result.failures])
166         self.assertEqual(3, filtered_result.testsRun)
167
168     def test_exclude_skips(self):
169         filtered_result = subunit.TestResultStats(None)
170         result_filter = TestResultFilter(filtered_result, filter_skip=True)
171         self.run_tests(result_filter)
172         self.assertEqual(0, filtered_result.skipped_tests)
173         self.assertEqual(2, filtered_result.failed_tests)
174         self.assertEqual(3, filtered_result.testsRun)
175
176     def test_include_success(self):
177         """Successes can be included if requested."""
178         filtered_result = unittest.TestResult()
179         result_filter = TestResultFilter(filtered_result,
180             filter_success=False)
181         self.run_tests(result_filter)
182         self.assertEqual(['error'],
183             [error[0].id() for error in filtered_result.errors])
184         self.assertEqual(['failed'],
185             [failure[0].id() for failure in
186             filtered_result.failures])
187         self.assertEqual(5, filtered_result.testsRun)
188
189     def test_filter_predicate(self):
190         """You can filter by predicate callbacks"""
191         # 0.0.7 and earlier did not support the 'tags' parameter, so we need
192         # to test that we still support behaviour without it.
193         filtered_result = unittest.TestResult()
194         def filter_cb(test, outcome, err, details):
195             return outcome == 'success'
196         result_filter = TestResultFilter(filtered_result,
197             filter_predicate=filter_cb,
198             filter_success=False)
199         self.run_tests(result_filter)
200         # Only success should pass
201         self.assertEqual(1, filtered_result.testsRun)
202
203     def test_filter_predicate_with_tags(self):
204         """You can filter by predicate callbacks that accept tags"""
205         filtered_result = unittest.TestResult()
206         def filter_cb(test, outcome, err, details, tags):
207             return outcome == 'success'
208         result_filter = TestResultFilter(filtered_result,
209             filter_predicate=filter_cb,
210             filter_success=False)
211         self.run_tests(result_filter)
212         # Only success should pass
213         self.assertEqual(1, filtered_result.testsRun)
214
215     def test_time_ordering_preserved(self):
216         # Passing a subunit stream through TestResultFilter preserves the
217         # relative ordering of 'time' directives and any other subunit
218         # directives that are still included.
219         date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
220         date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
221         date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
222         subunit_stream = _b('\n'.join([
223             "time: %s",
224             "test: foo",
225             "time: %s",
226             "error: foo",
227             "time: %s",
228             ""]) % (date_a, date_b, date_c))
229         result = ExtendedTestResult()
230         result_filter = TestResultFilter(result)
231         self.run_tests(result_filter, subunit_stream)
232         foo = subunit.RemotedTestCase('foo')
233         self.maxDiff = None
234         self.assertEqual(
235             [('time', date_a),
236              ('time', date_b),
237              ('startTest', foo),
238              ('addError', foo, {}),
239              ('stopTest', foo),
240              ('time', date_c)], result._events)
241
242     def test_time_passes_through_filtered_tests(self):
243         # Passing a subunit stream through TestResultFilter preserves 'time'
244         # directives even if a specific test is filtered out.
245         date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
246         date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
247         date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
248         subunit_stream = _b('\n'.join([
249             "time: %s",
250             "test: foo",
251             "time: %s",
252             "success: foo",
253             "time: %s",
254             ""]) % (date_a, date_b, date_c))
255         result = ExtendedTestResult()
256         result_filter = TestResultFilter(result)
257         result_filter.startTestRun()
258         self.run_tests(result_filter, subunit_stream)
259         result_filter.stopTestRun()
260         foo = subunit.RemotedTestCase('foo')
261         self.maxDiff = None
262         self.assertEqual(
263             [('startTestRun',),
264              ('time', date_a),
265              ('time', date_c),
266              ('stopTestRun',),], result._events)
267
268     def test_skip_preserved(self):
269         subunit_stream = _b('\n'.join([
270             "test: foo",
271             "skip: foo",
272             ""]))
273         result = ExtendedTestResult()
274         result_filter = TestResultFilter(result)
275         self.run_tests(result_filter, subunit_stream)
276         foo = subunit.RemotedTestCase('foo')
277         self.assertEquals(
278             [('startTest', foo),
279              ('addSkip', foo, {}),
280              ('stopTest', foo), ], result._events)
281
282     if sys.version_info < (2, 7):
283         # These tests require Python >=2.7.
284         del test_fixup_expected_failures, test_fixup_expected_errors, test_fixup_unexpected_success
285
286
287 class TestFilterCommand(TestCase):
288
289     example_subunit_stream = _b("""\
290 tags: global
291 test passed
292 success passed
293 test failed
294 tags: local
295 failure failed
296 test error
297 error error [
298 error details
299 ]
300 test skipped
301 skip skipped
302 test todo
303 xfail todo
304 """)
305
306     def run_command(self, args, stream):
307         root = os.path.dirname(
308             os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
309         script_path = os.path.join(root, 'filters', 'subunit-filter')
310         command = [sys.executable, script_path] + list(args)
311         ps = subprocess.Popen(
312             command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
313             stderr=subprocess.PIPE)
314         out, err = ps.communicate(stream)
315         if ps.returncode != 0:
316             raise RuntimeError("%s failed: %s" % (command, err))
317         return out
318
319     def to_events(self, stream):
320         test = subunit.ProtocolTestCase(BytesIO(stream))
321         result = ExtendedTestResult()
322         test.run(result)
323         return result._events
324
325     def test_default(self):
326         output = self.run_command([], _b(
327                 "test: foo\n"
328                 "skip: foo\n"
329                 ))
330         events = self.to_events(output)
331         foo = subunit.RemotedTestCase('foo')
332         self.assertEqual(
333             [('startTest', foo),
334              ('addSkip', foo, {}),
335              ('stopTest', foo)],
336             events)
337
338     def test_tags(self):
339         output = self.run_command(['-s', '--with-tag', 'a'], _b(
340                 "tags: a\n"
341                 "test: foo\n"
342                 "success: foo\n"
343                 "tags: -a\n"
344                 "test: bar\n"
345                 "success: bar\n"
346                 "test: baz\n"
347                 "tags: a\n"
348                 "success: baz\n"
349                 ))
350         events = self.to_events(output)
351         foo = subunit.RemotedTestCase('foo')
352         baz = subunit.RemotedTestCase('baz')
353         self.assertEqual(
354             [('tags', set(['a']), set()),
355              ('startTest', foo),
356              ('addSuccess', foo),
357              ('stopTest', foo),
358              ('tags', set(), set(['a'])),
359              ('startTest', baz),
360              ('tags', set(['a']), set()),
361              ('addSuccess', baz),
362              ('stopTest', baz),
363              ],
364             events)
365
366
367 def test_suite():
368     loader = subunit.tests.TestUtil.TestLoader()
369     result = loader.loadTestsFromName(__name__)
370     return result