Clean up file headers.
[jelmer/dulwich-libgit2.git] / dulwich / tests / test_web.py
1 # test_web.py -- Tests for the git HTTP server
2 # Copyright (C) 2010 Google, Inc.
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; version 2
7 # or (at your option) any later version of the License.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
17 # MA  02110-1301, USA.
18
19 """Tests for the Git HTTP server."""
20
21 from cStringIO import StringIO
22 import re
23 from unittest import TestCase
24
25 from dulwich.objects import (
26     Blob,
27     )
28 from dulwich.web import (
29     HTTP_OK,
30     HTTP_NOT_FOUND,
31     HTTP_FORBIDDEN,
32     send_file,
33     get_info_refs,
34     handle_service_request,
35     _LengthLimitedFile,
36     HTTPGitRequest,
37     HTTPGitApplication,
38     )
39
40
41 class WebTestCase(TestCase):
42     """Base TestCase that sets up some useful instance vars."""
43
44     def setUp(self):
45         self._environ = {}
46         self._req = HTTPGitRequest(self._environ, self._start_response,
47                                    handlers=self._handlers())
48         self._status = None
49         self._headers = []
50
51     def _start_response(self, status, headers):
52         self._status = status
53         self._headers = list(headers)
54
55     def _handlers(self):
56         return None
57
58
59 class DumbHandlersTestCase(WebTestCase):
60
61     def test_send_file_not_found(self):
62         list(send_file(self._req, None, 'text/plain'))
63         self.assertEquals(HTTP_NOT_FOUND, self._status)
64
65     def test_send_file(self):
66         f = StringIO('foobar')
67         output = ''.join(send_file(self._req, f, 'text/plain'))
68         self.assertEquals('foobar', output)
69         self.assertEquals(HTTP_OK, self._status)
70         self.assertTrue(('Content-Type', 'text/plain') in self._headers)
71         self.assertTrue(f.closed)
72
73     def test_send_file_buffered(self):
74         bufsize = 10240
75         xs = 'x' * bufsize
76         f = StringIO(2 * xs)
77         self.assertEquals([xs, xs],
78                           list(send_file(self._req, f, 'text/plain')))
79         self.assertEquals(HTTP_OK, self._status)
80         self.assertTrue(('Content-Type', 'text/plain') in self._headers)
81         self.assertTrue(f.closed)
82
83     def test_send_file_error(self):
84         class TestFile(object):
85             def __init__(self):
86                 self.closed = False
87
88             def read(self, size=-1):
89                 raise IOError
90
91             def close(self):
92                 self.closed = True
93
94         f = TestFile()
95         list(send_file(self._req, f, 'text/plain'))
96         self.assertEquals(HTTP_NOT_FOUND, self._status)
97         self.assertTrue(f.closed)
98
99     def test_get_info_refs(self):
100         self._environ['QUERY_STRING'] = ''
101
102         class TestTag(object):
103             def __init__(self, sha, obj_class, obj_sha):
104                 self.sha = lambda: sha
105                 self.object = (obj_class, obj_sha)
106
107         class TestBlob(object):
108             def __init__(self, sha):
109                 self.sha = lambda: sha
110
111         blob1 = TestBlob('111')
112         blob2 = TestBlob('222')
113         blob3 = TestBlob('333')
114
115         tag1 = TestTag('aaa', Blob, '222')
116
117         class TestRepo(object):
118
119             def __init__(self, objects, peeled):
120                 self._objects = dict((o.sha(), o) for o in objects)
121                 self._peeled = peeled
122
123             def get_peeled(self, sha):
124                 return self._peeled[sha]
125
126             def __getitem__(self, sha):
127                 return self._objects[sha]
128
129             def get_refs(self):
130                 return {
131                     'HEAD': '000',
132                     'refs/heads/master': blob1.sha(),
133                     'refs/tags/tag-tag': tag1.sha(),
134                     'refs/tags/blob-tag': blob3.sha(),
135                     }
136
137         class TestBackend(object):
138             def __init__(self):
139                 objects = [blob1, blob2, blob3, tag1]
140                 self.repo = TestRepo(objects, {
141                   'HEAD': '000',
142                   'refs/heads/master': blob1.sha(),
143                   'refs/tags/tag-tag': blob2.sha(),
144                   'refs/tags/blob-tag': blob3.sha(),
145                   })
146
147             def open_repository(self, path):
148                 assert path == '/'
149                 return self.repo
150
151             def get_refs(self):
152                 return {
153                   'HEAD': '000',
154                   'refs/heads/master': blob1.sha(),
155                   'refs/tags/tag-tag': tag1.sha(),
156                   'refs/tags/blob-tag': blob3.sha(),
157                   }
158
159         mat = re.search('.*', '//info/refs')
160         self.assertEquals(['111\trefs/heads/master\n',
161                            '333\trefs/tags/blob-tag\n',
162                            'aaa\trefs/tags/tag-tag\n',
163                            '222\trefs/tags/tag-tag^{}\n'],
164                           list(get_info_refs(self._req, TestBackend(), mat)))
165
166
167 class SmartHandlersTestCase(WebTestCase):
168
169     class _TestUploadPackHandler(object):
170         def __init__(self, backend, args, proto, stateless_rpc=False,
171                      advertise_refs=False):
172             self.args = args
173             self.proto = proto
174             self.stateless_rpc = stateless_rpc
175             self.advertise_refs = advertise_refs
176
177         def handle(self):
178             self.proto.write('handled input: %s' % self.proto.recv(1024))
179
180     def _make_handler(self, *args, **kwargs):
181         self._handler = self._TestUploadPackHandler(*args, **kwargs)
182         return self._handler
183
184     def _handlers(self):
185         return {'git-upload-pack': self._make_handler}
186
187     def test_handle_service_request_unknown(self):
188         mat = re.search('.*', '/git-evil-handler')
189         list(handle_service_request(self._req, 'backend', mat))
190         self.assertEquals(HTTP_FORBIDDEN, self._status)
191
192     def test_handle_service_request(self):
193         self._environ['wsgi.input'] = StringIO('foo')
194         mat = re.search('.*', '/git-upload-pack')
195         output = ''.join(handle_service_request(self._req, 'backend', mat))
196         self.assertEqual('handled input: foo', output)
197         response_type = 'application/x-git-upload-pack-response'
198         self.assertTrue(('Content-Type', response_type) in self._headers)
199         self.assertFalse(self._handler.advertise_refs)
200         self.assertTrue(self._handler.stateless_rpc)
201
202     def test_handle_service_request_with_length(self):
203         self._environ['wsgi.input'] = StringIO('foobar')
204         self._environ['CONTENT_LENGTH'] = 3
205         mat = re.search('.*', '/git-upload-pack')
206         output = ''.join(handle_service_request(self._req, 'backend', mat))
207         self.assertEqual('handled input: foo', output)
208         response_type = 'application/x-git-upload-pack-response'
209         self.assertTrue(('Content-Type', response_type) in self._headers)
210
211     def test_get_info_refs_unknown(self):
212         self._environ['QUERY_STRING'] = 'service=git-evil-handler'
213         list(get_info_refs(self._req, 'backend', None))
214         self.assertEquals(HTTP_FORBIDDEN, self._status)
215
216     def test_get_info_refs(self):
217         self._environ['wsgi.input'] = StringIO('foo')
218         self._environ['QUERY_STRING'] = 'service=git-upload-pack'
219
220         mat = re.search('.*', '/git-upload-pack')
221         output = ''.join(get_info_refs(self._req, 'backend', mat))
222         self.assertEquals(('001e# service=git-upload-pack\n'
223                            '0000'
224                            # input is ignored by the handler
225                            'handled input: '), output)
226         self.assertTrue(self._handler.advertise_refs)
227         self.assertTrue(self._handler.stateless_rpc)
228
229
230 class LengthLimitedFileTestCase(TestCase):
231     def test_no_cutoff(self):
232         f = _LengthLimitedFile(StringIO('foobar'), 1024)
233         self.assertEquals('foobar', f.read())
234
235     def test_cutoff(self):
236         f = _LengthLimitedFile(StringIO('foobar'), 3)
237         self.assertEquals('foo', f.read())
238         self.assertEquals('', f.read())
239
240     def test_multiple_reads(self):
241         f = _LengthLimitedFile(StringIO('foobar'), 3)
242         self.assertEquals('fo', f.read(2))
243         self.assertEquals('o', f.read(2))
244         self.assertEquals('', f.read())
245
246
247 class HTTPGitRequestTestCase(WebTestCase):
248     def test_not_found(self):
249         self._req.cache_forever()  # cache headers should be discarded
250         message = 'Something not found'
251         self.assertEquals(message, self._req.not_found(message))
252         self.assertEquals(HTTP_NOT_FOUND, self._status)
253         self.assertEquals(set([('Content-Type', 'text/plain')]),
254                           set(self._headers))
255
256     def test_forbidden(self):
257         self._req.cache_forever()  # cache headers should be discarded
258         message = 'Something not found'
259         self.assertEquals(message, self._req.forbidden(message))
260         self.assertEquals(HTTP_FORBIDDEN, self._status)
261         self.assertEquals(set([('Content-Type', 'text/plain')]),
262                           set(self._headers))
263
264     def test_respond_ok(self):
265         self._req.respond()
266         self.assertEquals([], self._headers)
267         self.assertEquals(HTTP_OK, self._status)
268
269     def test_respond(self):
270         self._req.nocache()
271         self._req.respond(status=402, content_type='some/type',
272                           headers=[('X-Foo', 'foo'), ('X-Bar', 'bar')])
273         self.assertEquals(set([
274           ('X-Foo', 'foo'),
275           ('X-Bar', 'bar'),
276           ('Content-Type', 'some/type'),
277           ('Expires', 'Fri, 01 Jan 1980 00:00:00 GMT'),
278           ('Pragma', 'no-cache'),
279           ('Cache-Control', 'no-cache, max-age=0, must-revalidate'),
280           ]), set(self._headers))
281         self.assertEquals(402, self._status)
282
283
284 class HTTPGitApplicationTestCase(TestCase):
285     def setUp(self):
286         self._app = HTTPGitApplication('backend')
287
288     def test_call(self):
289         def test_handler(req, backend, mat):
290             # tests interface used by all handlers
291             self.assertEquals(environ, req.environ)
292             self.assertEquals('backend', backend)
293             self.assertEquals('/foo', mat.group(0))
294             return 'output'
295
296         self._app.services = {
297           ('GET', re.compile('/foo$')): test_handler,
298         }
299         environ = {
300           'PATH_INFO': '/foo',
301           'REQUEST_METHOD': 'GET',
302           }
303         self.assertEquals('output', self._app(environ, None))