The test suite was failing 6 tests due to testtools changing it's output
[third_party/subunit] / python / subunit / chunked.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #  Copyright (C) 2011  Martin Pool <mbp@sourcefrog.net>
5 #
6 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
7 #  license at the users choice. A copy of both licenses are available in the
8 #  project source as Apache-2.0 and BSD. You may not use this file except in
9 #  compliance with one of these two licences.
10 #
11 #  Unless required by applicable law or agreed to in writing, software
12 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
13 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
14 #  license you chose for the specific language governing permissions and
15 #  limitations under that license.
16 #
17
18 """Encoder/decoder for http style chunked encoding."""
19
20 from testtools.compat import _b
21
22 empty = _b('')
23
24 class Decoder(object):
25     """Decode chunked content to a byte stream."""
26
27     def __init__(self, output, strict=True):
28         """Create a decoder decoding to output.
29
30         :param output: A file-like object. Bytes written to the Decoder are
31             decoded to strip off the chunking and written to the output.
32             Up to a full write worth of data or a single control line may be
33             buffered (whichever is larger). The close method should be called
34             when no more data is available, to detect short streams; the
35             write method will return none-None when the end of a stream is
36             detected. The output object must accept bytes objects.
37
38         :param strict: If True (the default), the decoder will not knowingly
39             accept input that is not conformant to the HTTP specification.
40             (This does not imply that it will catch every nonconformance.)
41             If False, it will accept incorrect input that is still
42             unambiguous.
43         """
44         self.output = output
45         self.buffered_bytes = []
46         self.state = self._read_length
47         self.body_length = 0
48         self.strict = strict
49         self._match_chars = _b("0123456789abcdefABCDEF\r\n")
50         self._slash_n = _b('\n')
51         self._slash_r = _b('\r')
52         self._slash_rn = _b('\r\n')
53         self._slash_nr = _b('\n\r')
54
55     def close(self):
56         """Close the decoder.
57
58         :raises ValueError: If the stream is incomplete ValueError is raised.
59         """
60         if self.state != self._finished:
61             raise ValueError("incomplete stream")
62
63     def _finished(self):
64         """Finished reading, return any remaining bytes."""
65         if self.buffered_bytes:
66             buffered_bytes = self.buffered_bytes
67             self.buffered_bytes = []
68             return empty.join(buffered_bytes)
69         else:
70             raise ValueError("stream is finished")
71
72     def _read_body(self):
73         """Pass body bytes to the output."""
74         while self.body_length and self.buffered_bytes:
75             if self.body_length >= len(self.buffered_bytes[0]):
76                 self.output.write(self.buffered_bytes[0])
77                 self.body_length -= len(self.buffered_bytes[0])
78                 del self.buffered_bytes[0]
79                 # No more data available.
80                 if not self.body_length:
81                     self.state = self._read_length
82             else:
83                 self.output.write(self.buffered_bytes[0][:self.body_length])
84                 self.buffered_bytes[0] = \
85                     self.buffered_bytes[0][self.body_length:]
86                 self.body_length = 0
87                 self.state = self._read_length
88                 return self.state()
89
90     def _read_length(self):
91         """Try to decode a length from the bytes."""
92         count_chars = []
93         for bytes in self.buffered_bytes:
94             for pos in range(len(bytes)):
95                 byte = bytes[pos:pos+1]
96                 if byte not in self._match_chars:
97                     break
98                 count_chars.append(byte)
99                 if byte == self._slash_n:
100                     break
101         if not count_chars:
102             return
103         if count_chars[-1] != self._slash_n:
104             return
105         count_str = empty.join(count_chars)
106         if self.strict:
107             if count_str[-2:] != self._slash_rn:
108                 raise ValueError("chunk header invalid: %r" % count_str)
109             if self._slash_r in count_str[:-2]:
110                 raise ValueError("too many CRs in chunk header %r" % count_str)
111         self.body_length = int(count_str.rstrip(self._slash_nr), 16)
112         excess_bytes = len(count_str)
113         while excess_bytes:
114             if excess_bytes >= len(self.buffered_bytes[0]):
115                 excess_bytes -= len(self.buffered_bytes[0])
116                 del self.buffered_bytes[0]
117             else:
118                 self.buffered_bytes[0] = self.buffered_bytes[0][excess_bytes:]
119                 excess_bytes = 0
120         if not self.body_length:
121             self.state = self._finished
122             if not self.buffered_bytes:
123                 # May not call into self._finished with no buffered data.
124                 return empty
125         else:
126             self.state = self._read_body
127         return self.state()
128
129     def write(self, bytes):
130         """Decode bytes to the output stream.
131
132         :raises ValueError: If the stream has already seen the end of file
133             marker.
134         :returns: None, or the excess bytes beyond the end of file marker.
135         """
136         if bytes:
137             self.buffered_bytes.append(bytes)
138         return self.state()
139
140
141 class Encoder(object):
142     """Encode content to a stream using HTTP Chunked coding."""
143
144     def __init__(self, output):
145         """Create an encoder encoding to output.
146
147         :param output: A file-like object. Bytes written to the Encoder
148             will be encoded using HTTP chunking. Small writes may be buffered
149             and the ``close`` method must be called to finish the stream.
150         """
151         self.output = output
152         self.buffered_bytes = []
153         self.buffer_size = 0
154
155     def flush(self, extra_len=0):
156         """Flush the encoder to the output stream.
157
158         :param extra_len: Increase the size of the chunk by this many bytes
159             to allow for a subsequent write.
160         """
161         if not self.buffer_size and not extra_len:
162             return
163         buffered_bytes = self.buffered_bytes
164         buffer_size = self.buffer_size
165         self.buffered_bytes = []
166         self.buffer_size = 0
167         self.output.write(_b("%X\r\n" % (buffer_size + extra_len)))
168         if buffer_size:
169             self.output.write(empty.join(buffered_bytes))
170         return True
171
172     def write(self, bytes):
173         """Encode bytes to the output stream."""
174         bytes_len = len(bytes)
175         if self.buffer_size + bytes_len >= 65536:
176             self.flush(bytes_len)
177             self.output.write(bytes)
178         else:
179             self.buffered_bytes.append(bytes)
180             self.buffer_size += bytes_len
181
182     def close(self):
183         """Finish the stream. This does not close the output stream."""
184         self.flush()
185         self.output.write(_b("0\r\n"))