Start on an encoder.
authorRobert Collins <robertc@robertcollins.net>
Fri, 22 Feb 2013 05:09:06 +0000 (18:09 +1300)
committerRobert Collins <robertc@robertcollins.net>
Fri, 22 Feb 2013 05:09:06 +0000 (18:09 +1300)
python/subunit/__init__.py
python/subunit/tests/__init__.py
python/subunit/tests/test_test_protocol2.py [new file with mode: 0644]
python/subunit/v2.py [new file with mode: 0644]

index 77f4a821c1f0dcc7600cd4a190ee907a5625163c..1a0b956ffc8c30aac522d48c026509c13b7f4d34 100644 (file)
@@ -146,6 +146,7 @@ except ImportError:
 from testtools import testresult
 
 from subunit import chunked, details, iso8601, test_results
+from subunit.v2 import StreamResultToBytes
 
 # same format as sys.version_info: "A tuple containing the five components of
 # the version number: major, minor, micro, releaselevel, and serial. All
index e0e1eb1b040573f1152d33ac48964e40e2fc489c..427522986663b89b76a869c63c667903ef3cc790 100644 (file)
@@ -25,6 +25,7 @@ from subunit.tests import (
     test_subunit_tags,
     test_tap2subunit,
     test_test_protocol,
+    test_test_protocol2,
     test_test_results,
     )
 
@@ -35,6 +36,7 @@ def test_suite():
     result.addTest(test_progress_model.test_suite())
     result.addTest(test_test_results.test_suite())
     result.addTest(test_test_protocol.test_suite())
+    result.addTest(test_test_protocol2.test_suite())
     result.addTest(test_tap2subunit.test_suite())
     result.addTest(test_subunit_filter.test_suite())
     result.addTest(test_subunit_tags.test_suite())
diff --git a/python/subunit/tests/test_test_protocol2.py b/python/subunit/tests/test_test_protocol2.py
new file mode 100644 (file)
index 0000000..9409ba2
--- /dev/null
@@ -0,0 +1,47 @@
+#
+#  subunit: extensions to Python unittest to get test results from subprocesses.
+#  Copyright (C) 2013  Robert Collins <robertc@robertcollins.net>
+#
+#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+#  license at the users choice. A copy of both licenses are available in the
+#  project source as Apache-2.0 and BSD. You may not use this file except in
+#  compliance with one of these two licences.
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
+#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+#  license you chose for the specific language governing permissions and
+#  limitations under that license.
+#
+
+from io import BytesIO
+
+from testtools import TestCase
+from testtools.tests.test_testresult import TestStreamResultContract
+
+import subunit
+
+class TestStreamResultToBytesContract(TestCase, TestStreamResultContract):
+    """Check that StreamResult behaves as testtools expects."""
+
+    def _make_result(self):
+        return subunit.StreamResultToBytes(BytesIO())
+
+
+class TestStreamResultToBytes(TestCase):
+
+    def _make_result(self):
+        output = BytesIO()
+        return subunit.StreamResultToBytes(output), output
+
+    def test_trivial_enumeration(self):
+        result, output = self._make_result()
+        result.status("foo", 'exists')
+        self.assertEqual(b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f',
+            output.getvalue())
+
+
+def test_suite():
+    loader = subunit.tests.TestUtil.TestLoader()
+    result = loader.loadTestsFromName(__name__)
+    return result
diff --git a/python/subunit/v2.py b/python/subunit/v2.py
new file mode 100644 (file)
index 0000000..78d36b6
--- /dev/null
@@ -0,0 +1,112 @@
+#
+#  subunit: extensions to Python unittest to get test results from subprocesses.
+#  Copyright (C) 2013  Robert Collins <robertc@robertcollins.net>
+#
+#  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+#  license at the users choice. A copy of both licenses are available in the
+#  project source as Apache-2.0 and BSD. You may not use this file except in
+#  compliance with one of these two licences.
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
+#  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+#  license you chose for the specific language governing permissions and
+#  limitations under that license.
+#
+
+import struct
+import zlib
+
+__all__ = [
+    'StreamResultToBytes',
+    ]
+
+
+class StreamResultToBytes(object):
+    """Convert StreamResult API calls to bytes.
+    
+    The StreamResult API is defined by testtools.StreamResult.
+    """
+
+    status_mask = {
+        None: 0,
+        'exists': 0x1,
+        'inprogress': 0x2,
+        'success': 0x3,
+        'uxsuccess': 0x4,
+        'skip': 0x5,
+        'fail': 0x6,
+        'xfail': 0x7,
+        }
+
+    fmt_16 = '>H'
+    fmt_32 = '>I'
+    zero_b = b'\0'[0]
+
+    def __init__(self, output_stream):
+        """Create a StreamResultToBytes with output written to output_stream.
+
+        :param output_stream: A file-like object. Must support write(bytes)
+            and flush() methods. Flush will be called after each write.
+        """
+        self.output_stream = output_stream
+
+    def startTestRun(self):
+        pass
+
+    def stopTestRun(self):
+        pass
+
+    def file(stream, file_name, file_bytes, eof=False, mime_type=None,
+        test_id=None, route_code=None, timestamp=None):
+        pass
+
+    def status(self, test_id, test_status, test_tags=None, runnable=True,
+        route_code=None, timestamp=None):
+        self._write_packet(test_id=test_id, test_status=test_status,
+            test_tags=test_tags, runnable=runnable, route_code=route_code,
+            timestamp=timestamp)
+
+    def _write_utf8(self, a_string, packet):
+        utf8 = a_string.encode('utf-8')
+        assert len(utf8) < 65536
+        packet.append(struct.pack(self.fmt_16, len(utf8)))
+        packet.append(utf8)
+
+    def _write_packet(self, test_id=None, test_status=None, test_tags=None,
+        runnable=True, route_code=None, timestamp=None):
+        packet = [b'\xb3']
+        packet.append(b'FF') # placeholder for flags
+        packet.append(b'FFF') # placeholder for length
+        flags = 0x2000 # Version 0x2
+        if test_id is not None:
+            flags = flags | 0x0800
+            self._write_utf8(test_id, packet)
+        if route_code is not None:
+            flags = flags | 0x0400
+        if timestamp is not None:
+            flags = flags | 0x0200
+        if runnable:
+            flags = flags | 0x0100
+        if test_tags:
+            flags = flags | 0x0080
+        #if file_content:
+        #    flags = flags | 0x0040
+        #if mime_type:
+        #    flags = flags | 0x0020
+        # if eof: 
+        #    flags = flags | 0x0010
+        # 0x0008 - not used in v2.
+        flags = flags | self.status_mask[test_status]
+        packet[1] = struct.pack(self.fmt_16, flags)
+        length = struct.pack(self.fmt_32, sum(map(len, packet)) + 4)
+        assert length[0] == self.zero_b
+        packet[2] = length[1:]
+        # We could either do a partial application of crc32 over each chunk
+        # or a single join to a temp variable then a final join 
+        # or two writes (that python might then split).
+        # For now, simplest code: join, crc32, join, output
+        content = b''.join(packet)
+        self.output_stream.write(content + struct.pack(
+            self.fmt_32, zlib.crc32(content) & 0xffffffff))
+        self.output_stream.flush()