Add a few missing tests.
[third_party/subunit] / python / subunit / _output.py
index c65fbe066ff3bfc274af284c32c992176a45b869..24d63dcd853f9906b62bfe01e4fba3c1e0aa5fae 100644 (file)
@@ -1,5 +1,5 @@
 #  subunit: extensions to python unittest to get test results from subprocesses.
-#  Copyright (C) 2013 'Subunit Contributors'
+#  Copyright (C) 2013 Subunit Contributors
 #
 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
 #  license at the users choice. A copy of both licenses are available in the
 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 #  license you chose for the specific language governing permissions and
 #  limitations under that license.
+#
 
-from argparse import ArgumentParser
 import datetime
 from functools import partial
-from sys import stdin, stdout
-
-from testtools.compat import _b
+from optparse import (
+    OptionGroup,
+    OptionParser,
+    OptionValueError,
+)
+import sys
 
 from subunit.iso8601 import UTC
 from subunit.v2 import StreamResultToBytes
 
 
+_FINAL_ACTIONS = frozenset([
+    'exists',
+    'fail',
+    'skip',
+    'success',
+    'uxsuccess',
+    'xfail',
+])
+_ALL_ACTIONS = _FINAL_ACTIONS.union(['inprogress'])
+_CHUNK_SIZE=3670016 # 3.5 MiB
+
+
 def output_main():
     args = parse_arguments()
-    output = get_output_stream_writer()
-    generate_bytestream(args, output)
+    output = StreamResultToBytes(sys.stdout)
+    generate_stream_results(args, output)
     return 0
 
 
-def parse_arguments(args=None, ParserClass=ArgumentParser):
+def parse_arguments(args=None, ParserClass=OptionParser):
     """Parse arguments from the command line.
 
     If specified, args must be a list of strings, similar to sys.argv[1:].
 
-    ParserClass can be specified to override the class we use to parse the
+    ParserClass may be specified to override the class we use to parse the
     command-line arguments. This is useful for testing.
     """
-
-    file_args = ParserClass(add_help=False)
-    file_args.add_argument(
+    parser = ParserClass(
+        prog="subunit-output",
+        description="A tool to generate a subunit v2 result byte-stream",
+        usage="subunit-output [-h] [status test_id] [options]",
+    )
+    parser.set_default('tags', None)
+    parser.set_default('test_id', None)
+
+    status_commands = OptionGroup(
+        parser,
+        "Status Commands",
+        "These options report the status of a test. TEST_ID must be a string "
+            "that uniquely identifies the test."
+    )
+    for action_name in _ALL_ACTIONS:
+        status_commands.add_option(
+            "--%s" % action_name,
+            nargs=1,
+            action="callback",
+            callback=set_status_cb,
+            callback_args=(action_name,),
+            dest="action",
+            metavar="TEST_ID",
+            help="Report a test status."
+        )
+    parser.add_option_group(status_commands)
+
+    file_commands = OptionGroup(
+        parser,
+        "File Options",
+        "These options control attaching data to a result stream. They can "
+            "either be specified with a status command, in which case the file "
+            "is attached to the test status, or by themselves, in which case "
+            "the file is attached to the stream (and not associated with any "
+            "test id)."
+    )
+    file_commands.add_option(
         "--attach-file",
         help="Attach a file to the result stream for this test. If '-' is "
             "specified, stdin will be read instead. In this case, the file "
             "name will be set to 'stdin' (but can still be overridden with "
             "the --file-name option)."
     )
-    file_args.add_argument(
+    file_commands.add_option(
         "--file-name",
         help="The name to give this file attachment. If not specified, the "
             "name of the file on disk will be used, or 'stdin' in the case "
             "where '-' was passed to the '--attach-file' argument. This option"
             " may only be specified when '--attach-file' is specified.",
         )
-    file_args.add_argument(
+    file_commands.add_option(
         "--mimetype",
         help="The mime type to send with this file. This is only used if the "
             "--attach-file argument is used. This argument is optional. If it "
@@ -62,150 +111,111 @@ def parse_arguments(args=None, ParserClass=ArgumentParser):
             "option may only be specified when '--attach-file' is specified.",
         default=None
     )
+    parser.add_option_group(file_commands)
 
-    common_args = ParserClass(add_help=False)
-    common_args.add_argument(
-        "test_id",
-        help="A string that uniquely identifies this test."
-    )
-    common_args.add_argument(
+    parser.add_option(
         "--tags",
-        help="A comma-separated list of tags to associate with this test.",
-        type=lambda s: s.split(','),
-        default=None
+        help="A comma-separated list of tags to associate with a test. This "
+            "option may only be used with a status command.",
+        action="callback",
+        callback=set_tags_cb,
+        default=[]
     )
 
-    parser = ParserClass(
-        prog='subunit-output',
-        description="A tool to generate a subunit result byte-stream",
-        usage="%(prog)s [-h] action [-h] test [--attach-file ATTACH_FILE]"
-            "[--mimetype MIMETYPE] [--tags TAGS]",
-        epilog="Additional help can be printed by passing -h to an action"
-            "(e.g.- '%(prog)s pass -h' will show help for the 'pass' action).",
-        parents=[file_args]
-    )
-    sub_parsers = parser.add_subparsers(
-        dest="action",
-        title="actions",
-        description="These actions are supported by this tool",
-    )
+    (options, args) = parser.parse_args(args)
+    if options.mimetype and not options.attach_file:
+        parser.error("Cannot specify --mimetype without --attach-file")
+    if options.file_name and not options.attach_file:
+        parser.error("Cannot specify --file-name without --attach-file")
+    if options.attach_file:
+        if options.attach_file == '-':
+            if not options.file_name:
+                options.file_name = 'stdin'
+            options.attach_file = sys.stdin
+        else:
+            try:
+                options.attach_file = open(options.attach_file)
+            except IOError as e:
+                parser.error("Cannot open %s (%s)" % (options.attach_file, e.strerror))
+    if options.tags and not options.action:
+        parser.error("Cannot specify --tags without a status command")
+    if not (options.attach_file or options.action):
+        parser.error("Must specify either --attach-file or a status command")
 
-    final_state = "This is a final action: No more actions may be generated "\
-        "for this test id after this one."
+    return options
 
-    sub_parsers.add_parser(
-        "inprogress",
-        help="Report that a test is in progress.",
-        parents=[common_args, file_args]
-    )
 
-    sub_parsers.add_parser(
-        "success",
-        help="Report that a test has succeeded. " + final_state,
-        parents=[common_args, file_args],
-    )
+def set_status_cb(option, opt_str, value, parser, status_name):
+    if getattr(parser.values, "action", None) is not None:
+        raise OptionValueError("argument %s: Only one status may be specified at once." % option)
 
-    sub_parsers.add_parser(
-        "fail",
-        help="Report that a test has failed. " + final_state,
-        parents=[common_args, file_args]
-    )
+    if len(parser.rargs) == 0:
+        raise OptionValueError("argument %s: must specify a single TEST_ID.")
+    parser.values.action = status_name
+    parser.values.test_id = parser.rargs.pop(0)
 
-    sub_parsers.add_parser(
-        "skip",
-        help="Report that a test was skipped. " + final_state,
-        parents=[common_args, file_args]
-    )
 
-    sub_parsers.add_parser(
-        "exists",
-        help="Report that a test exists. " + final_state,
-        parents=[common_args, file_args]
-    )
+def set_tags_cb(option, opt_str, value, parser):
+    parser.values.tags = parser.rargs.pop(0).split(',')
 
-    sub_parsers.add_parser(
-        "xfail",
-        help="Report that a test has failed expectedly (this is not counted as "
-            "a failure). " + final_state,
-        parents=[common_args, file_args],
-    )
 
-    sub_parsers.add_parser(
-        "uxsuccess",
-        help="Report that a test has succeeded unexpectedly (this is counted "
-            " as a failure). " + final_state,
-        parents=[common_args, file_args],
-    )
+def generate_stream_results(args, output_writer):
+    output_writer.startTestRun()
 
-    args = parser.parse_args(args)
-    if args.mimetype and not args.attach_file:
-        parser.error("Cannot specify --mimetype without --attach_file")
-    if args.file_name and not args.attach_file:
-        parser.error("Cannot specify --file-name without --attach_file")
     if args.attach_file:
-        if args.attach_file == '-':
-            if not args.file_name:
-                args.file_name = 'stdin'
-            args.attach_file = stdin
+        reader = partial(args.attach_file.read, _CHUNK_SIZE)
+        this_file_hunk = reader().encode('utf8')
+        next_file_hunk = reader().encode('utf8')
+
+    is_first_packet = True
+    is_last_packet = False
+    while not is_last_packet:
+
+        # XXX
+        def logme(*args, **kwargs):
+            print(args, kwargs)
+            output_writer.status(*args, **kwargs)
+        write_status = output_writer.status
+
+        if is_first_packet:
+            if args.attach_file:
+                # mimetype is specified on the first chunk only:
+                if args.mimetype:
+                    write_status = partial(write_status, mime_type=args.mimetype)
+            # tags are only written on the first packet:
+            if args.tags:
+                write_status = partial(write_status, test_tags=args.tags)
+            # timestamp is specified on the first chunk as well:
+            write_status = partial(write_status, timestamp=create_timestamp())
+            if args.action not in _FINAL_ACTIONS:
+                write_status = partial(write_status, test_status=args.action)
+            is_first_packet = False
+
+        if args.attach_file:
+            # filename might be overridden by the user
+            filename = args.file_name or args.attach_file.name
+            write_status = partial(write_status, file_name=filename, file_bytes=this_file_hunk)
+            if next_file_hunk == b'':
+                write_status = partial(write_status, eof=True)
+                is_last_packet = True
+            else:
+                this_file_hunk = next_file_hunk
+                next_file_hunk = reader().encode('utf8')
         else:
-            try:
-                args.attach_file = open(args.attach_file)
-            except IOError as e:
-                parser.error("Cannot open %s (%s)" % (args.attach_file, e.strerror))
-    return args
+            is_last_packet = True
 
+        if args.test_id:
+            write_status = partial(write_status, test_id=args.test_id)
 
-def get_output_stream_writer():
-    return StreamResultToBytes(stdout)
+        if is_last_packet:
+            write_status = partial(write_status, eof=True)
+            if args.action in _FINAL_ACTIONS:
+                write_status = partial(write_status, test_status=args.action)
 
+        write_status()
 
-def generate_bytestream(args, output_writer):
-    output_writer.startTestRun()
-    if args.attach_file:
-        write_chunked_file(
-            file_obj=args.attach_file,
-            test_id=args.test_id,
-            output_writer=output_writer,
-            mime_type=args.mimetype,
-        )
-    output_writer.status(
-        test_id=args.test_id,
-        test_status=args.action,
-        timestamp=create_timestamp(),
-        test_tags=args.tags,
-        )
     output_writer.stopTestRun()
 
 
-def write_chunked_file(file_obj, output_writer, chunk_size=1024,
-                       mime_type=None, test_id=None, file_name=None):
-    reader = partial(file_obj.read, chunk_size)
-
-    write_status = output_writer.status
-    if mime_type is not None:
-        write_status = partial(
-            write_status,
-            mime_type=mime_type
-        )
-    if test_id is not None:
-        write_status = partial(
-            write_status,
-            test_id=test_id
-        )
-    filename = file_name if file_name else file_obj.name
-
-    for chunk in iter(reader, _b('')):
-        write_status(
-            file_name=filename,
-            file_bytes=chunk,
-            eof=False,
-        )
-    write_status(
-        file_name=filename,
-        file_bytes=_b(''),
-        eof=True,
-    )
-
-
 def create_timestamp():
     return datetime.datetime.now(UTC)