# license you chose for the specific language governing permissions and
# limitations under that license.
-from argparse import ArgumentParser
+from argparse import ArgumentError, ArgumentParser, Action
import datetime
from functools import partial
from sys import stdin, stdout
ParserClass can be specified to override the class we use to parse the
command-line arguments. This is useful for testing.
+
"""
- file_args = ParserClass(add_help=False)
- file_args.add_argument(
+ class StatusAction(Action):
+ """A custom action that stores option name and argument separately.
+
+ This is part of a workaround for the fact that argparse does not
+ support optional subcommands (http://bugs.python.org/issue9253).
+ """
+
+ def __init__(self, status_name, *args, **kwargs):
+ super(StatusAction, self).__init__(*args, **kwargs)
+ self._status_name = status_name
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ if getattr(namespace, self.dest, None) is not None:
+ raise ArgumentError(self, "Only one status may be specified at once.")
+ setattr(namespace, self.dest, self._status_name)
+ setattr(namespace, 'test_id', values[0])
+
+
+ parser = ParserClass(
+ prog='subunit-output',
+ description="A tool to generate a subunit result byte-stream",
+ )
+
+ status_commands = parser.add_argument_group(
+ "Status Commands",
+ "These options report the status of a test. TEST_ID must be a string "
+ "that uniquely identifies the test."
+ )
+ final_actions = 'success fail skip xfail uxsuccess'.split()
+ for action in "inprogress success fail skip exists xfail uxsuccess".split():
+ final_text = "This is a final state: No more status reports may "\
+ "be generated for this test id after this one."
+
+ status_commands.add_argument(
+ "--%s" % action,
+ nargs=1,
+ action=partial(StatusAction, action),
+ dest="action",
+ metavar="TEST_ID",
+ help="Report a test status." + final_text if action in final_actions else ""
+ )
+
+ file_commands = parser.add_argument_group(
+ "File Options",
+ "These options control attaching data to a result stream. They can "
+ "either be specified with a status command, in which case the file "
+ "is attached to the test status, or by themselves, in which case "
+ "the file is attached to the stream (and not associated with any "
+ "test id)."
+ )
+ file_commands.add_argument(
"--attach-file",
help="Attach a file to the result stream for this test. If '-' is "
"specified, stdin will be read instead. In this case, the file "
"name will be set to 'stdin' (but can still be overridden with "
"the --file-name option)."
)
- file_args.add_argument(
+ file_commands.add_argument(
"--file-name",
help="The name to give this file attachment. If not specified, the "
"name of the file on disk will be used, or 'stdin' in the case "
"where '-' was passed to the '--attach-file' argument. This option"
" may only be specified when '--attach-file' is specified.",
)
- file_args.add_argument(
+ file_commands.add_argument(
"--mimetype",
help="The mime type to send with this file. This is only used if the "
"--attach-file argument is used. This argument is optional. If it "
default=None
)
- common_args = ParserClass(add_help=False)
- common_args.add_argument(
- "test_id",
- help="A string that uniquely identifies this test."
- )
- common_args.add_argument(
+ parser.add_argument(
"--tags",
- help="A comma-separated list of tags to associate with this test.",
+ help="A comma-separated list of tags to associate with a test. This "
+ "option may only be used with a status command.",
type=lambda s: s.split(','),
default=None
)
- parser = ParserClass(
- prog='subunit-output',
- description="A tool to generate a subunit result byte-stream",
- usage="%(prog)s [-h] action [-h] test [--attach-file ATTACH_FILE]"
- "[--mimetype MIMETYPE] [--tags TAGS]",
- epilog="Additional help can be printed by passing -h to an action"
- "(e.g.- '%(prog)s pass -h' will show help for the 'pass' action).",
- parents=[file_args]
- )
- sub_parsers = parser.add_subparsers(
- dest="action",
- title="actions",
- description="These actions are supported by this tool",
- )
-
- final_state = "This is a final action: No more actions may be generated "\
- "for this test id after this one."
-
- sub_parsers.add_parser(
- "inprogress",
- help="Report that a test is in progress.",
- parents=[common_args, file_args]
- )
-
- sub_parsers.add_parser(
- "success",
- help="Report that a test has succeeded. " + final_state,
- parents=[common_args, file_args],
- )
-
- sub_parsers.add_parser(
- "fail",
- help="Report that a test has failed. " + final_state,
- parents=[common_args, file_args]
- )
-
- sub_parsers.add_parser(
- "skip",
- help="Report that a test was skipped. " + final_state,
- parents=[common_args, file_args]
- )
-
- sub_parsers.add_parser(
- "exists",
- help="Report that a test exists. " + final_state,
- parents=[common_args, file_args]
- )
-
- sub_parsers.add_parser(
- "xfail",
- help="Report that a test has failed expectedly (this is not counted as "
- "a failure). " + final_state,
- parents=[common_args, file_args],
- )
-
- sub_parsers.add_parser(
- "uxsuccess",
- help="Report that a test has succeeded unexpectedly (this is counted "
- " as a failure). " + final_state,
- parents=[common_args, file_args],
- )
-
args = parser.parse_args(args)
if args.mimetype and not args.attach_file:
- parser.error("Cannot specify --mimetype without --attach_file")
+ parser.error("Cannot specify --mimetype without --attach-file")
if args.file_name and not args.attach_file:
- parser.error("Cannot specify --file-name without --attach_file")
+ parser.error("Cannot specify --file-name without --attach-file")
if args.attach_file:
if args.attach_file == '-':
if not args.file_name:
args.attach_file = open(args.attach_file)
except IOError as e:
parser.error("Cannot open %s (%s)" % (args.attach_file, e.strerror))
+ if args.tags and not args.action:
+ parser.error("Cannot specify --tags without a status command")
+
return args
Matcher,
MatchesListwise,
Mismatch,
+ raises,
)
from testtools.testresult.doubles import StreamResult
"""An ArgumentParser class that doesn't call sys.exit."""
def exit(self, status=0, message=""):
- raise RuntimeError(
- "ArgumentParser requested to exit with status %d and message %r"
- % (status, message)
- )
+ raise RuntimeError(message)
safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
class TestStatusArgParserTests(WithScenarios, TestCase):
scenarios = [
- (cmd, dict(command=cmd)) for cmd in (
+ (cmd, dict(command=cmd, option='--' + cmd)) for cmd in (
'exists',
- 'xfail',
'fail',
- 'success',
- 'skip',
'inprogress',
+ 'skip',
+ 'success',
'uxsuccess',
+ 'xfail',
)
]
- def _test_command(self, command, test_id):
- args = safe_parse_arguments(args=[command, test_id])
+ def test_can_parse_all_commands_with_test_id(self):
+ test_id = self.getUniqueString()
+ args = safe_parse_arguments(args=[self.option, test_id])
- self.assertThat(args.action, Equals(command))
+ self.assertThat(args.action, Equals(self.command))
self.assertThat(args.test_id, Equals(test_id))
- def test_can_parse_all_commands_with_test_id(self):
- self._test_command(self.command, self.getUniqueString())
-
def test_all_commands_parse_file_attachment(self):
with NamedTemporaryFile() as tmp_file:
args = safe_parse_arguments(
- args=[self.command, 'foo', '--attach-file', tmp_file.name]
+ args=[self.option, 'foo', '--attach-file', tmp_file.name]
)
self.assertThat(args.attach_file.name, Equals(tmp_file.name))
def test_all_commands_accept_mimetype_argument(self):
with NamedTemporaryFile() as tmp_file:
args = safe_parse_arguments(
- args=[self.command, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
+ args=[self.option, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
)
self.assertThat(args.mimetype, Equals("text/plain"))
def test_all_commands_accept_tags_argument(self):
args = safe_parse_arguments(
- args=[self.command, 'foo', '--tags', "foo,bar,baz"]
+ args=[self.option, 'foo', '--tags', "foo,bar,baz"]
)
self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
def test_attach_file_with_hyphen_opens_stdin(self):
self.patch(_o, 'stdin', StringIO(_u("Hello")))
args = safe_parse_arguments(
- args=[self.command, "foo", "--attach-file", "-"]
+ args=[self.option, "foo", "--attach-file", "-"]
)
self.assertThat(args.attach_file.read(), Equals("Hello"))
-class GlobalFileAttachmentTests(TestCase):
+class ArgParserTests(TestCase):
+
+ def setUp(self):
+ super(ArgParserTests, self).setUp()
+ # prevent ARgumentParser from printing to stderr:
+ self._stderr = BytesIO()
+ self.patch(argparse._sys, 'stderr', self._stderr)
def test_can_parse_attach_file_without_test_id(self):
with NamedTemporaryFile() as tmp_file:
)
self.assertThat(args.attach_file.name, Equals(tmp_file.name))
-class ByteStreamCompatibilityTests(TestCase):
-
- _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
-
- def setUp(self):
- super(ByteStreamCompatibilityTests, self).setUp()
- self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
-
- def _get_result_for(self, *commands):
- """Get a result object from *commands.
-
- Runs the 'generate_bytestream' function from subunit._output after
- parsing *commands as if they were specified on the command line. The
- resulting bytestream is then converted back into a result object and
- returned.
- """
- stream = BytesIO()
-
- for command_list in commands:
- args = safe_parse_arguments(command_list)
- output_writer = StreamResultToBytes(output_stream=stream)
- generate_bytestream(args, output_writer)
-
- stream.seek(0)
-
- case = ByteStreamToStreamResult(source=stream)
- result = StreamResult()
- case.run(result)
- return result
-
- def test_start_generates_inprogress(self):
- result = self._get_result_for(
- ['inprogress', 'foo'],
+ def test_cannot_specify_more_than_one_status_command(self):
+ fn = lambda: safe_parse_arguments(['--fail', 'foo', '--skip', 'bar'])
+ self.assertThat(
+ fn,
+ raises(RuntimeError('subunit-output: error: argument --skip: '\
+ 'Only one status may be specified at once.\n'))
)
+ def test_cannot_specify_mimetype_without_attach_file(self):
+ fn = lambda: safe_parse_arguments(['--mimetype', 'foo'])
self.assertThat(
- result._events[0],
- MatchesCall(
- call='status',
- test_id='foo',
- test_status='inprogress',
- timestamp=self._dummy_timestamp,
- )
+ fn,
+ raises(RuntimeError('subunit-output: error: Cannot specify '\
+ '--mimetype without --attach-file\n'))
)
- def test_pass_generates_success(self):
- result = self._get_result_for(
- ['success', 'foo'],
+ def test_cannot_specify_filename_without_attach_file(self):
+ fn = lambda: safe_parse_arguments(['--file-name', 'foo'])
+ self.assertThat(
+ fn,
+ raises(RuntimeError('subunit-output: error: Cannot specify '\
+ '--file-name without --attach-file\n'))
)
+ def test_cannot_specify_tags_without_status_command(self):
+ fn = lambda: safe_parse_arguments(['--tags', 'foo'])
self.assertThat(
- result._events[0],
- MatchesCall(
- call='status',
- test_id='foo',
- test_status='success',
- timestamp=self._dummy_timestamp,
- )
+ fn,
+ raises(RuntimeError('subunit-output: error: Cannot specify '\
+ '--tags without a status command\n'))
)
- def test_fail_generates_fail(self):
- result = self._get_result_for(
- ['fail', 'foo'],
- )
- self.assertThat(
- result._events[0],
- MatchesCall(
- call='status',
- test_id='foo',
- test_status='fail',
- timestamp=self._dummy_timestamp,
- )
- )
+def get_result_for(commands):
+ """Get a result object from *commands.
- def test_skip_generates_skip(self):
- result = self._get_result_for(
- ['skip', 'foo'],
- )
+ Runs the 'generate_bytestream' function from subunit._output after
+ parsing *commands as if they were specified on the command line. The
+ resulting bytestream is then converted back into a result object and
+ returned.
+ """
+ stream = BytesIO()
- self.assertThat(
- result._events[0],
- MatchesCall(
- call='status',
- test_id='foo',
- test_status='skip',
- timestamp=self._dummy_timestamp,
- )
- )
+ args = safe_parse_arguments(commands)
+ output_writer = StreamResultToBytes(output_stream=stream)
+ generate_bytestream(args, output_writer)
- def test_exists_generates_exists(self):
- result = self._get_result_for(
- ['exists', 'foo'],
- )
+ stream.seek(0)
- self.assertThat(
- result._events[0],
- MatchesCall(
- call='status',
- test_id='foo',
- test_status='exists',
- timestamp=self._dummy_timestamp,
- )
- )
+ case = ByteStreamToStreamResult(source=stream)
+ result = StreamResult()
+ case.run(result)
+ return result
- def test_expected_fail_generates_xfail(self):
- result = self._get_result_for(
- ['xfail', 'foo'],
- )
- self.assertThat(
- result._events[0],
- MatchesCall(
- call='status',
- test_id='foo',
- test_status='xfail',
- timestamp=self._dummy_timestamp,
- )
- )
+class ByteStreamCompatibilityTests(WithScenarios, TestCase):
- def test_unexpected_success_generates_uxsuccess(self):
- result = self._get_result_for(
- ['uxsuccess', 'foo'],
+ scenarios = [
+ (s, dict(status=s, option='--' + s)) for s in (
+ 'exists',
+ 'fail',
+ 'inprogress',
+ 'skip',
+ 'success',
+ 'uxsuccess',
+ 'xfail',
)
+ ]
+
+ _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
+
+ def setUp(self):
+ super(ByteStreamCompatibilityTests, self).setUp()
+ self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
+
+
+ def test_correct_status_is_generated(self):
+ result = get_result_for([self.option, 'foo'])
self.assertThat(
result._events[0],
MatchesCall(
call='status',
test_id='foo',
- test_status='uxsuccess',
+ test_status=self.status,
timestamp=self._dummy_timestamp,
)
)
- def test_tags_are_generated(self):
- result = self._get_result_for(
- ['exists', 'foo', '--tags', 'hello,world']
- )
+ def test_all_commands_accept_tags(self):
+ result = get_result_for([self.option, 'foo', '--tags', 'hello,world'])
self.assertThat(
result._events[0],
MatchesCall(
)
-class FileChunkingTests(TestCase):
+class FileChunkingTests(WithScenarios, TestCase):
- def _write_chunk_file(self, file_data, chunk_size=1024, mimetype=None, filename=None):
+ scenarios = [
+ ("With test_id", dict(test_id="foo")),
+ ("Without test_id", dict(test_id=None)),
+ ]
+
+ def _write_chunk_file(self, file_data, chunk_size=1024, mimetype=None, filename=None, test_id=None):
"""Write file data to a subunit stream, get a StreamResult object."""
stream = BytesIO()
output_writer = StreamResultToBytes(output_stream=stream)
output_writer=output_writer,
chunk_size=chunk_size,
mime_type=mimetype,
- test_id='foo_test',
+ test_id=test_id,
file_name=filename,
)
return result
def test_file_chunk_size_is_honored(self):
- result = self._write_chunk_file(file_data=_b("Hello"), chunk_size=1)
+ result = self._write_chunk_file(
+ file_data=_b("Hello"),
+ chunk_size=1,
+ test_id=self.test_id,
+ )
self.assertThat(
result._events,
MatchesListwise([
- MatchesCall(call='status', file_bytes=_b('H'), mime_type=None, eof=False),
- MatchesCall(call='status', file_bytes=_b('e'), mime_type=None, eof=False),
- MatchesCall(call='status', file_bytes=_b('l'), mime_type=None, eof=False),
- MatchesCall(call='status', file_bytes=_b('l'), mime_type=None, eof=False),
- MatchesCall(call='status', file_bytes=_b('o'), mime_type=None, eof=False),
- MatchesCall(call='status', file_bytes=_b(''), mime_type=None, eof=True),
+ MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('H'), mime_type=None, eof=False),
+ MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('e'), mime_type=None, eof=False),
+ MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('l'), mime_type=None, eof=False),
+ MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('l'), mime_type=None, eof=False),
+ MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('o'), mime_type=None, eof=False),
+ MatchesCall(call='status', test_id=self.test_id, file_bytes=_b(''), mime_type=None, eof=True),
])
)
def test_file_mimetype_is_honored(self):
- result = self._write_chunk_file(file_data=_b("SomeData"), mimetype="text/plain")
+ result = self._write_chunk_file(
+ file_data=_b("SomeData"),
+ mimetype="text/plain",
+ test_id=self.test_id,
+ )
self.assertThat(
result._events,
MatchesListwise([
- MatchesCall(call='status', file_bytes=_b('SomeData'), mime_type="text/plain"),
- MatchesCall(call='status', file_bytes=_b(''), mime_type="text/plain"),
+ MatchesCall(call='status', test_id=self.test_id, file_bytes=_b('SomeData'), mime_type="text/plain"),
+ MatchesCall(call='status', test_id=self.test_id, file_bytes=_b(''), mime_type="text/plain"),
])
)
def test_file_name_is_honored(self):
- result = self._write_chunk_file(file_data=_b("data"), filename="/some/name")
+ result = self._write_chunk_file(
+ file_data=_b("data"),
+ filename="/some/name",
+ test_id=self.test_id
+ )
self.assertThat(
result._events,
MatchesListwise([
- MatchesCall(call='status', file_name='/some/name'),
- MatchesCall(call='status', file_name='/some/name'),
+ MatchesCall(call='status', test_id=self.test_id, file_name='/some/name'),
+ MatchesCall(call='status', test_id=self.test_id, file_name='/some/name'),
])
)