from subunit.v2 import StreamResultToBytes
+
def output_main():
args = parse_arguments()
output = get_output_stream_writer()
)
common_args.add_argument(
"--mimetype",
- help="The mime type to send with this file. This is only used if the "\
- "--attach-file argument is used. This argument is optional. If it is "\
+ help="The mime type to send with this file. This is only used if the "
+ "--attach-file argument is used. This argument is optional. If it is "
"not specified, the file will be sent wihtout a mime type.",
default=None
)
description="These actions are supported by this tool",
)
- final_state = "This is a final action: No more actions may be generated " \
+ final_state = "This is a final action: No more actions may be generated "\
"for this test id after this one."
parser_start = sub_parsers.add_parser(
parser_expected_fail = sub_parsers.add_parser(
"expected-fail",
- help="Marks a test as failing expectedly (this is not counted as a "\
- "failure). " + final_state,
+ help="Marks a test as failing expectedly (this is not counted as a "
+ "failure). " + final_state,
parents=[common_args],
)
parser_unexpected_success = sub_parsers.add_parser(
"unexpected-success",
- help="Marks a test as succeeding unexpectedly (this is counted as a "\
- "failure). " + final_state,
+ help="Marks a test as succeeding unexpectedly (this is counted as a "
+ "failure). " + final_state,
parents=[common_args],
)
def write_chunked_file(file_obj, test_id, output_writer, chunk_size=1024,
- mime_type=None):
+ mime_type=None):
reader = partial(file_obj.read, chunk_size)
for chunk in iter(reader, ''):
output_writer.status(
eof=False,
)
output_writer.status(
- test_id=test_id,
- file_name=file_obj.name,
- file_bytes='',
- mime_type=mime_type,
- eof=True,
- )
+ test_id=test_id,
+ file_name=file_obj.name,
+ file_bytes='',
+ mime_type=mime_type,
+ eof=True,
+ )
_ZERO = datetime.timedelta(0)
class UTC(datetime.tzinfo):
- """UTC"""
+
def utcoffset(self, dt):
return _ZERO
+
def tzname(self, dt):
return "UTC"
+
def dst(self, dt):
return _ZERO
"""An ArgumentParser class that doesn't call sys.exit."""
def exit(self, status=0, message=""):
- raise RuntimeError("ArgumentParser requested to exit with status "\
- " %d and message %r" % (status, message))
+ raise RuntimeError(
+ "ArgumentParser requested to exit with status %d and message %r"
+ % (status, message)
+ )
safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
self._test_command(command, self.getUniqueString())
def test_command_translation(self):
- self.assertThat(translate_command_name('start'), Equals('inprogress'))
- self.assertThat(translate_command_name('pass'), Equals('success'))
- self.assertThat(translate_command_name('expected-fail'), Equals('xfail'))
- self.assertThat(translate_command_name('unexpected-success'), Equals('uxsuccess'))
+ self.assertThat(
+ translate_command_name('start'),
+ Equals('inprogress')
+ )
+ self.assertThat(
+ translate_command_name('pass'),
+ Equals('success')
+ )
+ self.assertThat(
+ translate_command_name('expected-fail'),
+ Equals('xfail')
+ )
+ self.assertThat(
+ translate_command_name('unexpected-success'),
+ Equals('uxsuccess')
+ )
for command in ('fail', 'skip', 'exists'):
self.assertThat(translate_command_name(command), Equals(command))
args = safe_parse_arguments(
args=[command, 'foo', '--tags', "foo,bar,baz"]
)
- self.assertThat(args.tags, Equals(["foo","bar","baz"]))
+ self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
class ByteStreamCompatibilityTests(TestCase):
MatchesCall(
call='status',
test_id='foo',
- test_tags=set(['hello','world']),
+ test_tags=set(['hello', 'world']),
timestamp=self._dummy_timestamp,
)
)
class FileChunkingTests(TestCase):
def _write_chunk_file(self, file_data, chunk_size, mimetype=None):
- """Write chunked data to a subunit stream, return a StreamResult object."""
+ """Write file data to a subunit stream, get a StreamResult object."""
stream = BytesIO()
output_writer = StreamResultToBytes(output_stream=stream)
f.write(file_data)
f.seek(0)
- write_chunked_file(f, 'foo_test', output_writer, chunk_size, mimetype)
+ write_chunked_file(
+ f,
+ 'foo_test',
+ output_writer,
+ chunk_size,
+ mimetype
+ )
stream.seek(0)
try:
pos = self._position_lookup[k]
if call_tuple[pos] != v:
- return Mismatch("Value for key is %r, not %r" % (call_tuple[pos], v))
+ return Mismatch(
+ "Value for key is %r, not %r" % (call_tuple[pos], v)
+ )
except IndexError:
return Mismatch("Key %s is not present." % k)