Switch to using command line options to specify status. Expand help output, and refac...
authorThomi Richards <thomi.richards@canonical.com>
Wed, 20 Nov 2013 01:09:48 +0000 (14:09 +1300)
committerThomi Richards <thomi.richards@canonical.com>
Wed, 20 Nov 2013 01:09:48 +0000 (14:09 +1300)
python/subunit/_output.py
python/subunit/tests/test_output_filter.py

index c65fbe066ff3bfc274af284c32c992176a45b869..ae405ef4de7fe7180ea5c271e3f623ba72e52967 100644 (file)
@@ -12,7 +12,7 @@
 #  license you chose for the specific language governing permissions and
 #  limitations under that license.
 
-from argparse import ArgumentParser
+from argparse import ArgumentError, ArgumentParser, Action
 import datetime
 from functools import partial
 from sys import stdin, stdout
@@ -37,24 +37,74 @@ def parse_arguments(args=None, ParserClass=ArgumentParser):
 
     ParserClass can be specified to override the class we use to parse the
     command-line arguments. This is useful for testing.
+
     """
 
-    file_args = ParserClass(add_help=False)
-    file_args.add_argument(
+    class StatusAction(Action):
+        """A custom action that stores option name and argument separately.
+
+        This is part of a workaround for the fact that argparse does not
+        support optional subcommands (http://bugs.python.org/issue9253).
+        """
+
+        def __init__(self, status_name, *args, **kwargs):
+            super(StatusAction, self).__init__(*args, **kwargs)
+            self._status_name = status_name
+
+        def __call__(self, parser, namespace, values, option_string=None):
+            if getattr(namespace, self.dest, None) is not None:
+                raise ArgumentError(self, "Only one status may be specified at once.")
+            setattr(namespace, self.dest, self._status_name)
+            setattr(namespace, 'test_id', values[0])
+
+
+    parser = ParserClass(
+        prog='subunit-output',
+        description="A tool to generate a subunit result byte-stream",
+    )
+
+    status_commands = parser.add_argument_group(
+        "Status Commands",
+        "These options report the status of a test. TEST_ID must be a string "
+            "that uniquely identifies the test."
+    )
+    final_actions = 'success fail skip xfail uxsuccess'.split()
+    for action in "inprogress success fail skip exists xfail uxsuccess".split():
+        final_text =  "This is a final state: No more status reports may "\
+            "be generated for this test id after this one."
+
+        status_commands.add_argument(
+            "--%s" % action,
+            nargs=1,
+            action=partial(StatusAction, action),
+            dest="action",
+            metavar="TEST_ID",
+            help="Report a test status." + final_text if action in final_actions else ""
+        )
+
+    file_commands = parser.add_argument_group(
+        "File Options",
+        "These options control attaching data to a result stream. They can "
+            "either be specified with a status command, in which case the file "
+            "is attached to the test status, or by themselves, in which case "
+            "the file is attached to the stream (and not associated with any "
+            "test id)."
+    )
+    file_commands.add_argument(
         "--attach-file",
         help="Attach a file to the result stream for this test. If '-' is "
             "specified, stdin will be read instead. In this case, the file "
             "name will be set to 'stdin' (but can still be overridden with "
             "the --file-name option)."
     )
-    file_args.add_argument(
+    file_commands.add_argument(
         "--file-name",
         help="The name to give this file attachment. If not specified, the "
             "name of the file on disk will be used, or 'stdin' in the case "
             "where '-' was passed to the '--attach-file' argument. This option"
             " may only be specified when '--attach-file' is specified.",
         )
-    file_args.add_argument(
+    file_commands.add_argument(
         "--mimetype",
         help="The mime type to send with this file. This is only used if the "
             "--attach-file argument is used. This argument is optional. If it "
@@ -63,85 +113,19 @@ def parse_arguments(args=None, ParserClass=ArgumentParser):
         default=None
     )
 
-    common_args = ParserClass(add_help=False)
-    common_args.add_argument(
-        "test_id",
-        help="A string that uniquely identifies this test."
-    )
-    common_args.add_argument(
+    parser.add_argument(
         "--tags",
-        help="A comma-separated list of tags to associate with this test.",
+        help="A comma-separated list of tags to associate with a test. This "
+            "option may only be used with a status command.",
         type=lambda s: s.split(','),
         default=None
     )
 
-    parser = ParserClass(
-        prog='subunit-output',
-        description="A tool to generate a subunit result byte-stream",
-        usage="%(prog)s [-h] action [-h] test [--attach-file ATTACH_FILE]"
-            "[--mimetype MIMETYPE] [--tags TAGS]",
-        epilog="Additional help can be printed by passing -h to an action"
-            "(e.g.- '%(prog)s pass -h' will show help for the 'pass' action).",
-        parents=[file_args]
-    )
-    sub_parsers = parser.add_subparsers(
-        dest="action",
-        title="actions",
-        description="These actions are supported by this tool",
-    )
-
-    final_state = "This is a final action: No more actions may be generated "\
-        "for this test id after this one."
-
-    sub_parsers.add_parser(
-        "inprogress",
-        help="Report that a test is in progress.",
-        parents=[common_args, file_args]
-    )
-
-    sub_parsers.add_parser(
-        "success",
-        help="Report that a test has succeeded. " + final_state,
-        parents=[common_args, file_args],
-    )
-
-    sub_parsers.add_parser(
-        "fail",
-        help="Report that a test has failed. " + final_state,
-        parents=[common_args, file_args]
-    )
-
-    sub_parsers.add_parser(
-        "skip",
-        help="Report that a test was skipped. " + final_state,
-        parents=[common_args, file_args]
-    )
-
-    sub_parsers.add_parser(
-        "exists",
-        help="Report that a test exists. " + final_state,
-        parents=[common_args, file_args]
-    )
-
-    sub_parsers.add_parser(
-        "xfail",
-        help="Report that a test has failed expectedly (this is not counted as "
-            "a failure). " + final_state,
-        parents=[common_args, file_args],
-    )
-
-    sub_parsers.add_parser(
-        "uxsuccess",
-        help="Report that a test has succeeded unexpectedly (this is counted "
-            " as a failure). " + final_state,
-        parents=[common_args, file_args],
-    )
-
     args = parser.parse_args(args)
     if args.mimetype and not args.attach_file:
-        parser.error("Cannot specify --mimetype without --attach_file")
+        parser.error("Cannot specify --mimetype without --attach-file")
     if args.file_name and not args.attach_file:
-        parser.error("Cannot specify --file-name without --attach_file")
+        parser.error("Cannot specify --file-name without --attach-file")
     if args.attach_file:
         if args.attach_file == '-':
             if not args.file_name:
@@ -152,6 +136,9 @@ def parse_arguments(args=None, ParserClass=ArgumentParser):
                 args.attach_file = open(args.attach_file)
             except IOError as e:
                 parser.error("Cannot open %s (%s)" % (args.attach_file, e.strerror))
+    if args.tags and not args.action:
+        parser.error("Cannot specify --tags without a status command")
+
     return args
 
 
index 8fda9aa4f160b5012d05f15f8410442a10369652..102b97043cfab9bd9e963741d731165181abd177 100644 (file)
@@ -31,6 +31,7 @@ from testtools.matchers import (
     Matcher,
     MatchesListwise,
     Mismatch,
+    raises,
 )
 from testtools.testresult.doubles import StreamResult
 
@@ -48,10 +49,7 @@ class SafeArgumentParser(argparse.ArgumentParser):
     """An ArgumentParser class that doesn't call sys.exit."""
 
     def exit(self, status=0, message=""):
-        raise RuntimeError(
-            "ArgumentParser requested to exit with status %d and message %r"
-            % (status, message)
-        )
+        raise RuntimeError(message)
 
 
 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
@@ -60,56 +58,60 @@ safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
 class TestStatusArgParserTests(WithScenarios, TestCase):
 
     scenarios = [
-        (cmd, dict(command=cmd)) for cmd in (
+        (cmd, dict(command=cmd, option='--' + cmd)) for cmd in (
             'exists',
-            'xfail',
             'fail',
-            'success',
-            'skip',
             'inprogress',
+            'skip',
+            'success',
             'uxsuccess',
+            'xfail',
         )
     ]
 
-    def _test_command(self, command, test_id):
-        args = safe_parse_arguments(args=[command, test_id])
+    def test_can_parse_all_commands_with_test_id(self):
+        test_id = self.getUniqueString()
+        args = safe_parse_arguments(args=[self.option, test_id])
 
-        self.assertThat(args.action, Equals(command))
+        self.assertThat(args.action, Equals(self.command))
         self.assertThat(args.test_id, Equals(test_id))
 
-    def test_can_parse_all_commands_with_test_id(self):
-        self._test_command(self.command, self.getUniqueString())
-
     def test_all_commands_parse_file_attachment(self):
         with NamedTemporaryFile() as tmp_file:
             args = safe_parse_arguments(
-                args=[self.command, 'foo', '--attach-file', tmp_file.name]
+                args=[self.option, 'foo', '--attach-file', tmp_file.name]
             )
             self.assertThat(args.attach_file.name, Equals(tmp_file.name))
 
     def test_all_commands_accept_mimetype_argument(self):
         with NamedTemporaryFile() as tmp_file:
             args = safe_parse_arguments(
-                args=[self.command, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
+                args=[self.option, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
             )
             self.assertThat(args.mimetype, Equals("text/plain"))
 
     def test_all_commands_accept_tags_argument(self):
         args = safe_parse_arguments(
-            args=[self.command, 'foo', '--tags', "foo,bar,baz"]
+            args=[self.option, 'foo', '--tags', "foo,bar,baz"]
         )
         self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
 
     def test_attach_file_with_hyphen_opens_stdin(self):
         self.patch(_o, 'stdin', StringIO(_u("Hello")))
         args = safe_parse_arguments(
-            args=[self.command, "foo", "--attach-file", "-"]
+            args=[self.option, "foo", "--attach-file", "-"]
         )
 
         self.assertThat(args.attach_file.read(), Equals("Hello"))
 
 
-class GlobalFileAttachmentTests(TestCase):
+class ArgParserTests(TestCase):
+
+    def setUp(self):
+        super(ArgParserTests, self).setUp()
+        # prevent ARgumentParser from printing to stderr:
+        self._stderr = BytesIO()
+        self.patch(argparse._sys, 'stderr', self._stderr)
 
     def test_can_parse_attach_file_without_test_id(self):
         with NamedTemporaryFile() as tmp_file:
@@ -118,145 +120,97 @@ class GlobalFileAttachmentTests(TestCase):
             )
             self.assertThat(args.attach_file.name, Equals(tmp_file.name))
 
-class ByteStreamCompatibilityTests(TestCase):
-
-    _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
-
-    def setUp(self):
-        super(ByteStreamCompatibilityTests, self).setUp()
-        self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
-
-    def _get_result_for(self, *commands):
-        """Get a result object from *commands.
-
-        Runs the 'generate_bytestream' function from subunit._output after
-        parsing *commands as if they were specified on the command line. The
-        resulting bytestream is then converted back into a result object and
-        returned.
-        """
-        stream = BytesIO()
-
-        for command_list in commands:
-            args = safe_parse_arguments(command_list)
-            output_writer = StreamResultToBytes(output_stream=stream)
-            generate_bytestream(args, output_writer)
-
-        stream.seek(0)
-
-        case = ByteStreamToStreamResult(source=stream)
-        result = StreamResult()
-        case.run(result)
-        return result
-
-    def test_start_generates_inprogress(self):
-        result = self._get_result_for(
-            ['inprogress', 'foo'],
+    def test_cannot_specify_more_than_one_status_command(self):
+        fn = lambda: safe_parse_arguments(['--fail', 'foo', '--skip', 'bar'])
+        self.assertThat(
+            fn,
+            raises(RuntimeError('subunit-output: error: argument --skip: '\
+                'Only one status may be specified at once.\n'))
         )
 
+    def test_cannot_specify_mimetype_without_attach_file(self):
+        fn = lambda: safe_parse_arguments(['--mimetype', 'foo'])
         self.assertThat(
-            result._events[0],
-            MatchesCall(
-                call='status',
-                test_id='foo',
-                test_status='inprogress',
-                timestamp=self._dummy_timestamp,
-            )
+            fn,
+            raises(RuntimeError('subunit-output: error: Cannot specify '\
+                '--mimetype without --attach-file\n'))
         )
 
-    def test_pass_generates_success(self):
-        result = self._get_result_for(
-            ['success', 'foo'],
+    def test_cannot_specify_filename_without_attach_file(self):
+        fn = lambda: safe_parse_arguments(['--file-name', 'foo'])
+        self.assertThat(
+            fn,
+            raises(RuntimeError('subunit-output: error: Cannot specify '\
+                '--file-name without --attach-file\n'))
         )
 
+    def test_cannot_specify_tags_without_status_command(self):
+        fn = lambda: safe_parse_arguments(['--tags', 'foo'])
         self.assertThat(
-            result._events[0],
-            MatchesCall(
-                call='status',
-                test_id='foo',
-                test_status='success',
-                timestamp=self._dummy_timestamp,
-            )
+            fn,
+            raises(RuntimeError('subunit-output: error: Cannot specify '\
+                '--tags without a status command\n'))
         )
 
-    def test_fail_generates_fail(self):
-        result = self._get_result_for(
-            ['fail', 'foo'],
-        )
 
-        self.assertThat(
-            result._events[0],
-            MatchesCall(
-                call='status',
-                test_id='foo',
-                test_status='fail',
-                timestamp=self._dummy_timestamp,
-            )
-        )
+def get_result_for(commands):
+    """Get a result object from *commands.
 
-    def test_skip_generates_skip(self):
-        result = self._get_result_for(
-            ['skip', 'foo'],
-        )
+    Runs the 'generate_bytestream' function from subunit._output after
+    parsing *commands as if they were specified on the command line. The
+    resulting bytestream is then converted back into a result object and
+    returned.
+    """
+    stream = BytesIO()
 
-        self.assertThat(
-            result._events[0],
-            MatchesCall(
-                call='status',
-                test_id='foo',
-                test_status='skip',
-                timestamp=self._dummy_timestamp,
-            )
-        )
+    args = safe_parse_arguments(commands)
+    output_writer = StreamResultToBytes(output_stream=stream)
+    generate_bytestream(args, output_writer)
 
-    def test_exists_generates_exists(self):
-        result = self._get_result_for(
-            ['exists', 'foo'],
-        )
+    stream.seek(0)
 
-        self.assertThat(
-            result._events[0],
-            MatchesCall(
-                call='status',
-                test_id='foo',
-                test_status='exists',
-                timestamp=self._dummy_timestamp,
-            )
-        )
+    case = ByteStreamToStreamResult(source=stream)
+    result = StreamResult()
+    case.run(result)
+    return result
 
-    def test_expected_fail_generates_xfail(self):
-        result = self._get_result_for(
-            ['xfail', 'foo'],
-        )
 
-        self.assertThat(
-            result._events[0],
-            MatchesCall(
-                call='status',
-                test_id='foo',
-                test_status='xfail',
-                timestamp=self._dummy_timestamp,
-            )
-        )
+class ByteStreamCompatibilityTests(WithScenarios, TestCase):
 
-    def test_unexpected_success_generates_uxsuccess(self):
-        result = self._get_result_for(
-            ['uxsuccess', 'foo'],
+    scenarios = [
+        (s, dict(status=s, option='--' + s)) for s in (
+            'exists',
+            'fail',
+            'inprogress',
+            'skip',
+            'success',
+            'uxsuccess',
+            'xfail',
         )
+    ]
+
+    _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
+
+    def setUp(self):
+        super(ByteStreamCompatibilityTests, self).setUp()
+        self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
+
+
+    def test_correct_status_is_generated(self):
+        result = get_result_for([self.option, 'foo'])
 
         self.assertThat(
             result._events[0],
             MatchesCall(
                 call='status',
                 test_id='foo',
-                test_status='uxsuccess',
+                test_status=self.status,
                 timestamp=self._dummy_timestamp,
             )
         )
 
-    def test_tags_are_generated(self):
-        result = self._get_result_for(
-            ['exists', 'foo', '--tags', 'hello,world']
-        )
+    def test_all_commands_accept_tags(self):
+        result = get_result_for([self.option, 'foo', '--tags', 'hello,world'])
         self.assertThat(
             result._events[0],
             MatchesCall(
@@ -268,9 +222,14 @@ class ByteStreamCompatibilityTests(TestCase):
         )
 
 
-class FileChunkingTests(TestCase):
+class FileChunkingTests(WithScenarios, TestCase):
 
-    def _write_chunk_file(self, file_data, chunk_size=1024, mimetype=None, filename=None):
+    scenarios = [
+        ("With test_id", dict(test_id="foo")),
+        ("Without test_id", dict(test_id=None)),
+    ]
+
+    def _write_chunk_file(self, file_data, chunk_size=1024, mimetype=None, filename=None, test_id=None):
         """Write file data to a subunit stream, get a StreamResult object."""
         stream = BytesIO()
         output_writer = StreamResultToBytes(output_stream=stream)
@@ -285,7 +244,7 @@ class FileChunkingTests(TestCase):
                 output_writer=output_writer,
                 chunk_size=chunk_size,
                 mime_type=mimetype,
-                test_id='foo_test',
+                test_id=test_id,
                 file_name=filename,
             )
 
@@ -297,36 +256,48 @@ class FileChunkingTests(TestCase):
         return result
 
     def test_file_chunk_size_is_honored(self):
-        result = self._write_chunk_file(file_data=_b("Hello"), chunk_size=1)
+        result = self._write_chunk_file(
+            file_data=_b("Hello"),
+            chunk_size=1,
+            test_id=self.test_id,
+        )
         self.assertThat(
             result._events,
             MatchesListwise([
-                MatchesCall(call='status', file_bytes=_b('H'), mime_type=None, eof=False),
-                MatchesCall(call='status', file_bytes=_b('e'), mime_type=None, eof=False),
-                MatchesCall(call='status', file_bytes=_b('l'), mime_type=None, eof=False),
-                MatchesCall(call='status', file_bytes=_b('l'), mime_type=None, eof=False),
-                MatchesCall(call='status', file_bytes=_b('o'), mime_type=None, eof=False),
-                MatchesCall(call='status', file_bytes=_b(''), mime_type=None, eof=True),
+                MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('H'), mime_type=None, eof=False),
+                MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('e'), mime_type=None, eof=False),
+                MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('l'), mime_type=None, eof=False),
+                MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('l'), mime_type=None, eof=False),
+                MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('o'), mime_type=None, eof=False),
+                MatchesCall(call='status', test_id=self.test_id, file_bytes=_b(''), mime_type=None, eof=True),
             ])
         )
 
     def test_file_mimetype_is_honored(self):
-        result = self._write_chunk_file(file_data=_b("SomeData"), mimetype="text/plain")
+        result = self._write_chunk_file(
+            file_data=_b("SomeData"),
+            mimetype="text/plain",
+            test_id=self.test_id,
+        )
         self.assertThat(
             result._events,
             MatchesListwise([
-                MatchesCall(call='status', file_bytes=_b('SomeData'), mime_type="text/plain"),
-                MatchesCall(call='status', file_bytes=_b(''), mime_type="text/plain"),
+                MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('SomeData'), mime_type="text/plain"),
+                MatchesCall(call='status', test_id=self.test_id, file_bytes=_b(''), mime_type="text/plain"),
             ])
         )
 
     def test_file_name_is_honored(self):
-        result = self._write_chunk_file(file_data=_b("data"), filename="/some/name")
+        result = self._write_chunk_file(
+            file_data=_b("data"),
+            filename="/some/name",
+            test_id=self.test_id
+        )
         self.assertThat(
             result._events,
             MatchesListwise([
-                MatchesCall(call='status', file_name='/some/name'),
-                MatchesCall(call='status', file_name='/some/name'),
+                MatchesCall(call='status', test_id=self.test_id, file_name='/some/name'),
+                MatchesCall(call='status', test_id=self.test_id, file_name='/some/name'),
             ])
         )