Previously, the word "type" was massively overloaded in objects.py. It
[jelmer/dulwich-libgit2.git] / dulwich / tests / test_web.py
1 # test_web.py -- Tests for the git HTTP server
2 # Copryight (C) 2010 Google, Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; version 2
7 # or (at your option) any later version of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 # MA  02110-1301, USA.
18
19 """Tests for the Git HTTP server."""
20
21 from cStringIO import StringIO
22 import re
23 from unittest import TestCase
24
25 from dulwich.objects import (
26     Tag,
27     Blob,
28     )
29 from dulwich.web import (
30     HTTP_OK,
31     HTTP_NOT_FOUND,
32     HTTP_FORBIDDEN,
33     send_file,
34     get_info_refs,
35     handle_service_request,
36     _LengthLimitedFile,
37     HTTPGitRequest,
38     HTTPGitApplication,
39     )
40
41
42 class WebTestCase(TestCase):
43     """Base TestCase that sets up some useful instance vars."""
44     def setUp(self):
45         self._environ = {}
46         self._req = HTTPGitRequest(self._environ, self._start_response)
47         self._status = None
48         self._headers = []
49
50     def _start_response(self, status, headers):
51         self._status = status
52         self._headers = list(headers)
53
54
55 class DumbHandlersTestCase(WebTestCase):
56
57     def test_send_file_not_found(self):
58         list(send_file(self._req, None, 'text/plain'))
59         self.assertEquals(HTTP_NOT_FOUND, self._status)
60
61     def test_send_file(self):
62         f = StringIO('foobar')
63         output = ''.join(send_file(self._req, f, 'text/plain'))
64         self.assertEquals('foobar', output)
65         self.assertEquals(HTTP_OK, self._status)
66         self.assertTrue(('Content-Type', 'text/plain') in self._headers)
67         self.assertTrue(f.closed)
68
69     def test_send_file_buffered(self):
70         bufsize = 10240
71         xs = 'x' * bufsize
72         f = StringIO(2 * xs)
73         self.assertEquals([xs, xs],
74                           list(send_file(self._req, f, 'text/plain')))
75         self.assertEquals(HTTP_OK, self._status)
76         self.assertTrue(('Content-Type', 'text/plain') in self._headers)
77         self.assertTrue(f.closed)
78
79     def test_send_file_error(self):
80         class TestFile(object):
81             def __init__(self):
82                 self.closed = False
83
84             def read(self, size=-1):
85                 raise IOError
86
87             def close(self):
88                 self.closed = True
89
90         f = TestFile()
91         list(send_file(self._req, f, 'text/plain'))
92         self.assertEquals(HTTP_NOT_FOUND, self._status)
93         self.assertTrue(f.closed)
94
95     def test_get_info_refs(self):
96         self._environ['QUERY_STRING'] = ''
97
98         class TestTag(object):
99             def __init__(self, sha, obj_class, obj_sha):
100                 self.sha = lambda: sha
101                 self.object = (obj_class, obj_sha)
102
103         class TestBlob(object):
104             def __init__(self, sha):
105                 self.sha = lambda: sha
106
107         blob1 = TestBlob('111')
108         blob2 = TestBlob('222')
109         blob3 = TestBlob('333')
110
111         tag1 = TestTag('aaa', Blob, '222')
112
113         class TestRepo(object):
114             def __init__(self, objects, peeled):
115                 self._objects = dict((o.sha(), o) for o in objects)
116                 self._peeled = peeled
117
118             def get_peeled(self, sha):
119                 return self._peeled[sha]
120
121             def __getitem__(self, sha):
122                 return self._objects[sha]
123
124         class TestBackend(object):
125             def __init__(self):
126                 objects = [blob1, blob2, blob3, tag1]
127                 self.repo = TestRepo(objects, {
128                     'HEAD': '000',
129                     'refs/heads/master': blob1.sha(),
130                     'refs/tags/tag-tag': blob2.sha(),
131                     'refs/tags/blob-tag': blob3.sha(),
132                     })
133
134             def get_refs(self):
135                 return {
136                     'HEAD': '000',
137                     'refs/heads/master': blob1.sha(),
138                     'refs/tags/tag-tag': tag1.sha(),
139                     'refs/tags/blob-tag': blob3.sha(),
140                     }
141
142         self.assertEquals(['111\trefs/heads/master\n',
143                            '333\trefs/tags/blob-tag\n',
144                            'aaa\trefs/tags/tag-tag\n',
145                            '222\trefs/tags/tag-tag^{}\n'],
146                           list(get_info_refs(self._req, TestBackend(), None)))
147
148
149 class SmartHandlersTestCase(WebTestCase):
150
151     class TestProtocol(object):
152         def __init__(self, handler):
153             self._handler = handler
154
155         def write_pkt_line(self, line):
156             if line is None:
157                 self._handler.write('flush-pkt\n')
158             else:
159                 self._handler.write('pkt-line: %s' % line)
160
161     class _TestUploadPackHandler(object):
162         def __init__(self, backend, read, write, stateless_rpc=False,
163                      advertise_refs=False):
164             self.read = read
165             self.write = write
166             self.proto = SmartHandlersTestCase.TestProtocol(self)
167             self.stateless_rpc = stateless_rpc
168             self.advertise_refs = advertise_refs
169
170         def handle(self):
171             self.write('handled input: %s' % self.read())
172
173     def _MakeHandler(self, *args, **kwargs):
174         self._handler = self._TestUploadPackHandler(*args, **kwargs)
175         return self._handler
176
177     def services(self):
178         return {'git-upload-pack': self._MakeHandler}
179
180     def test_handle_service_request_unknown(self):
181         mat = re.search('.*', '/git-evil-handler')
182         list(handle_service_request(self._req, 'backend', mat))
183         self.assertEquals(HTTP_FORBIDDEN, self._status)
184
185     def test_handle_service_request(self):
186         self._environ['wsgi.input'] = StringIO('foo')
187         mat = re.search('.*', '/git-upload-pack')
188         output = ''.join(handle_service_request(self._req, 'backend', mat,
189                                                 services=self.services()))
190         self.assertEqual('handled input: foo', output)
191         response_type = 'application/x-git-upload-pack-response'
192         self.assertTrue(('Content-Type', response_type) in self._headers)
193         self.assertFalse(self._handler.advertise_refs)
194         self.assertTrue(self._handler.stateless_rpc)
195
196     def test_handle_service_request_with_length(self):
197         self._environ['wsgi.input'] = StringIO('foobar')
198         self._environ['CONTENT_LENGTH'] = 3
199         mat = re.search('.*', '/git-upload-pack')
200         output = ''.join(handle_service_request(self._req, 'backend', mat,
201                                                 services=self.services()))
202         self.assertEqual('handled input: foo', output)
203         response_type = 'application/x-git-upload-pack-response'
204         self.assertTrue(('Content-Type', response_type) in self._headers)
205
206     def test_get_info_refs_unknown(self):
207         self._environ['QUERY_STRING'] = 'service=git-evil-handler'
208         list(get_info_refs(self._req, 'backend', None,
209                            services=self.services()))
210         self.assertEquals(HTTP_FORBIDDEN, self._status)
211
212     def test_get_info_refs(self):
213         self._environ['wsgi.input'] = StringIO('foo')
214         self._environ['QUERY_STRING'] = 'service=git-upload-pack'
215
216         output = ''.join(get_info_refs(self._req, 'backend', None,
217                                        services=self.services()))
218         self.assertEquals(('pkt-line: # service=git-upload-pack\n'
219                            'flush-pkt\n'
220                            # input is ignored by the handler
221                            'handled input: '), output)
222         self.assertTrue(self._handler.advertise_refs)
223         self.assertTrue(self._handler.stateless_rpc)
224
225
226 class LengthLimitedFileTestCase(TestCase):
227     def test_no_cutoff(self):
228         f = _LengthLimitedFile(StringIO('foobar'), 1024)
229         self.assertEquals('foobar', f.read())
230
231     def test_cutoff(self):
232         f = _LengthLimitedFile(StringIO('foobar'), 3)
233         self.assertEquals('foo', f.read())
234         self.assertEquals('', f.read())
235
236     def test_multiple_reads(self):
237         f = _LengthLimitedFile(StringIO('foobar'), 3)
238         self.assertEquals('fo', f.read(2))
239         self.assertEquals('o', f.read(2))
240         self.assertEquals('', f.read())
241
242
243 class HTTPGitRequestTestCase(WebTestCase):
244     def test_not_found(self):
245         self._req.cache_forever()  # cache headers should be discarded
246         message = 'Something not found'
247         self.assertEquals(message, self._req.not_found(message))
248         self.assertEquals(HTTP_NOT_FOUND, self._status)
249         self.assertEquals(set([('Content-Type', 'text/plain')]),
250                           set(self._headers))
251
252     def test_forbidden(self):
253         self._req.cache_forever()  # cache headers should be discarded
254         message = 'Something not found'
255         self.assertEquals(message, self._req.forbidden(message))
256         self.assertEquals(HTTP_FORBIDDEN, self._status)
257         self.assertEquals(set([('Content-Type', 'text/plain')]),
258                           set(self._headers))
259
260     def test_respond_ok(self):
261         self._req.respond()
262         self.assertEquals([], self._headers)
263         self.assertEquals(HTTP_OK, self._status)
264
265     def test_respond(self):
266         self._req.nocache()
267         self._req.respond(status=402, content_type='some/type',
268                           headers=[('X-Foo', 'foo'), ('X-Bar', 'bar')])
269         self.assertEquals(set([
270             ('X-Foo', 'foo'),
271             ('X-Bar', 'bar'),
272             ('Content-Type', 'some/type'),
273             ('Expires', 'Fri, 01 Jan 1980 00:00:00 GMT'),
274             ('Pragma', 'no-cache'),
275             ('Cache-Control', 'no-cache, max-age=0, must-revalidate'),
276             ]), set(self._headers))
277         self.assertEquals(402, self._status)
278
279
280 class HTTPGitApplicationTestCase(TestCase):
281     def setUp(self):
282         self._app = HTTPGitApplication('backend')
283
284     def test_call(self):
285         def test_handler(req, backend, mat):
286             # tests interface used by all handlers
287             self.assertEquals(environ, req.environ)
288             self.assertEquals('backend', backend)
289             self.assertEquals('/foo', mat.group(0))
290             return 'output'
291
292         self._app.services = {
293             ('GET', re.compile('/foo$')): test_handler,
294         }
295         environ = {
296             'PATH_INFO': '/foo',
297             'REQUEST_METHOD': 'GET',
298             }
299         self.assertEquals('output', self._app(environ, None))