Change check() methods in pack.py to raise rather than return bools.
[jelmer/dulwich-libgit2.git] / dulwich / tests / test_pack.py
1 # test_pack.py -- Tests for the handling of git packs.
2 # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3 # Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; version 2
8 # of the License, or (at your option) any later version of the license.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18 # MA  02110-1301, USA.
19
20
21 """Tests for Dulwich packs."""
22
23
24 from cStringIO import StringIO
25 import os
26 import unittest
27 import zlib
28
29 from dulwich.errors import (
30     ChecksumMismatch,
31     )
32 from dulwich.objects import (
33     Tree,
34     )
35 from dulwich.pack import (
36     Pack,
37     PackData,
38     apply_delta,
39     create_delta,
40     load_pack_index,
41     hex_to_sha,
42     read_zlib_chunks,
43     sha_to_hex,
44     write_pack_index_v1,
45     write_pack_index_v2,
46     write_pack,
47     )
48
49 pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
50
51 a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
52 tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
53 commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
54
55 class PackTests(unittest.TestCase):
56     """Base class for testing packs"""
57   
58     datadir = os.path.join(os.path.dirname(__file__), 'data/packs')
59   
60     def get_pack_index(self, sha):
61         """Returns a PackIndex from the datadir with the given sha"""
62         return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
63   
64     def get_pack_data(self, sha):
65         """Returns a PackData object from the datadir with the given sha"""
66         return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
67   
68     def get_pack(self, sha):
69         return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
70
71     def assertSucceeds(self, func, *args, **kwargs):
72         try:
73             func(*args, **kwargs)
74         except ChecksumMismatch, e:
75             self.fail(e)
76
77
78 class PackIndexTests(PackTests):
79     """Class that tests the index of packfiles"""
80   
81     def test_object_index(self):
82         """Tests that the correct object offset is returned from the index."""
83         p = self.get_pack_index(pack1_sha)
84         self.assertRaises(KeyError, p.object_index, pack1_sha)
85         self.assertEqual(p.object_index(a_sha), 178)
86         self.assertEqual(p.object_index(tree_sha), 138)
87         self.assertEqual(p.object_index(commit_sha), 12)
88   
89     def test_index_len(self):
90         p = self.get_pack_index(pack1_sha)
91         self.assertEquals(3, len(p))
92   
93     def test_get_stored_checksum(self):
94         p = self.get_pack_index(pack1_sha)
95         self.assertEquals("\xf2\x84\x8e*\xd1o2\x9a\xe1\xc9.;\x95\xe9\x18\x88\xda\xa5\xbd\x01", str(p.get_stored_checksum()))
96         self.assertEquals( 'r\x19\x80\xe8f\xaf\x9a_\x93\xadgAD\xe1E\x9b\x8b\xa3\xe7\xb7' , str(p.get_pack_checksum()))
97
98     def test_index_check(self):
99         p = self.get_pack_index(pack1_sha)
100         self.assertSucceeds(p.check)
101
102     def test_iterentries(self):
103         p = self.get_pack_index(pack1_sha)
104         self.assertEquals([('og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 178, None), ('\xb2\xa2vj(y\xc2\t\xab\x11v\xe7\xe7x\xb8\x1a\xe4"\xee\xaa', 138, None), ('\xf1\x8f\xaa\x16S\x1a\xc5p\xa3\xfd\xc8\xc7\xca\x16h%H\xda\xfd\x12', 12, None)], list(p.iterentries()))
105   
106     def test_iter(self):
107         p = self.get_pack_index(pack1_sha)
108         self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
109   
110
111 class TestPackDeltas(unittest.TestCase):
112   
113     test_string1 = "The answer was flailing in the wind"
114     test_string2 = "The answer was falling down the pipe"
115     test_string3 = "zzzzz"
116   
117     test_string_empty = ""
118     test_string_big = "Z" * 8192
119   
120     def _test_roundtrip(self, base, target):
121         self.assertEquals([target],
122             apply_delta(base, create_delta(base, target)))
123   
124     def test_nochange(self):
125         self._test_roundtrip(self.test_string1, self.test_string1)
126   
127     def test_change(self):
128         self._test_roundtrip(self.test_string1, self.test_string2)
129   
130     def test_rewrite(self):
131         self._test_roundtrip(self.test_string1, self.test_string3)
132   
133     def test_overflow(self):
134         self._test_roundtrip(self.test_string_empty, self.test_string_big)
135
136
137 class TestPackData(PackTests):
138     """Tests getting the data from the packfile."""
139   
140     def test_create_pack(self):
141         p = self.get_pack_data(pack1_sha)
142   
143     def test_pack_len(self):
144         p = self.get_pack_data(pack1_sha)
145         self.assertEquals(3, len(p))
146
147     def test_index_check(self):
148         p = self.get_pack_data(pack1_sha)
149         self.assertSucceeds(p.check)
150
151     def test_iterobjects(self):
152         p = self.get_pack_data(pack1_sha)
153         self.assertEquals([(12, 1, 'tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\nauthor James Westby <jw+debian@jameswestby.net> 1174945067 +0100\ncommitter James Westby <jw+debian@jameswestby.net> 1174945067 +0100\n\nTest commit\n', 3775879613L), (138, 2, '100644 a\x00og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 912998690L), (178, 3, 'test 1\n', 1373561701L)], [(len, type, "".join(chunks), offset) for (len, type, chunks, offset) in p.iterobjects()])
154   
155     def test_iterentries(self):
156         p = self.get_pack_data(pack1_sha)
157         self.assertEquals(set([('og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 178, 1373561701L), ('\xb2\xa2vj(y\xc2\t\xab\x11v\xe7\xe7x\xb8\x1a\xe4"\xee\xaa', 138, 912998690L), ('\xf1\x8f\xaa\x16S\x1a\xc5p\xa3\xfd\xc8\xc7\xca\x16h%H\xda\xfd\x12', 12, 3775879613L)]), set(p.iterentries()))
158   
159     def test_create_index_v1(self):
160         p = self.get_pack_data(pack1_sha)
161         p.create_index_v1("v1test.idx")
162         idx1 = load_pack_index("v1test.idx")
163         idx2 = self.get_pack_index(pack1_sha)
164         self.assertEquals(idx1, idx2)
165   
166     def test_create_index_v2(self):
167         p = self.get_pack_data(pack1_sha)
168         p.create_index_v2("v2test.idx")
169         idx1 = load_pack_index("v2test.idx")
170         idx2 = self.get_pack_index(pack1_sha)
171         self.assertEquals(idx1, idx2)
172
173
174 class TestPack(PackTests):
175
176     def test_len(self):
177         p = self.get_pack(pack1_sha)
178         self.assertEquals(3, len(p))
179
180     def test_contains(self):
181         p = self.get_pack(pack1_sha)
182         self.assertTrue(tree_sha in p)
183
184     def test_get(self):
185         p = self.get_pack(pack1_sha)
186         self.assertEquals(type(p[tree_sha]), Tree)
187
188     def test_iter(self):
189         p = self.get_pack(pack1_sha)
190         self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
191
192     def test_get_object_at(self):
193         """Tests random access for non-delta objects"""
194         p = self.get_pack(pack1_sha)
195         obj = p[a_sha]
196         self.assertEqual(obj.type_name, 'blob')
197         self.assertEqual(obj.sha().hexdigest(), a_sha)
198         obj = p[tree_sha]
199         self.assertEqual(obj.type_name, 'tree')
200         self.assertEqual(obj.sha().hexdigest(), tree_sha)
201         obj = p[commit_sha]
202         self.assertEqual(obj.type_name, 'commit')
203         self.assertEqual(obj.sha().hexdigest(), commit_sha)
204
205     def test_copy(self):
206         origpack = self.get_pack(pack1_sha)
207         self.assertSucceeds(origpack.index.check)
208         write_pack("Elch", [(x, "") for x in origpack.iterobjects()], 
209             len(origpack))
210         newpack = Pack("Elch")
211         self.assertEquals(origpack, newpack)
212         self.assertSucceeds(newpack.index.check)
213         self.assertEquals(origpack.name(), newpack.name())
214         self.assertEquals(origpack.index.get_pack_checksum(), 
215                           newpack.index.get_pack_checksum())
216
217         self.assertTrue(
218                 (origpack.index.version != newpack.index.version) or
219                 (origpack.index.get_stored_checksum() == newpack.index.get_stored_checksum()))
220
221     def test_commit_obj(self):
222         p = self.get_pack(pack1_sha)
223         commit = p[commit_sha]
224         self.assertEquals("James Westby <jw+debian@jameswestby.net>",
225             commit.author)
226         self.assertEquals([], commit.parents)
227
228     def test_name(self):
229         p = self.get_pack(pack1_sha)
230         self.assertEquals(pack1_sha, p.name())
231
232
233 class TestHexToSha(unittest.TestCase):
234
235     def test_simple(self):
236         self.assertEquals('\xab\xcd' * 10, hex_to_sha("abcd" * 10))
237
238     def test_reverse(self):
239         self.assertEquals("abcd" * 10, sha_to_hex('\xab\xcd' * 10))
240
241
242 class BaseTestPackIndexWriting(object):
243
244     def assertSucceeds(self, func, *args, **kwargs):
245         try:
246             func(*args, **kwargs)
247         except ChecksumMismatch, e:
248             self.fail(e)
249
250     def test_empty(self):
251         pack_checksum = 'r\x19\x80\xe8f\xaf\x9a_\x93\xadgAD\xe1E\x9b\x8b\xa3\xe7\xb7'
252         self._write_fn("empty.idx", [], pack_checksum)
253         idx = load_pack_index("empty.idx")
254         self.assertSucceeds(idx.check)
255         self.assertEquals(idx.get_pack_checksum(), pack_checksum)
256         self.assertEquals(0, len(idx))
257
258     def test_single(self):
259         pack_checksum = 'r\x19\x80\xe8f\xaf\x9a_\x93\xadgAD\xe1E\x9b\x8b\xa3\xe7\xb7'
260         my_entries = [('og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 178, 42)]
261         my_entries.sort()
262         self._write_fn("single.idx", my_entries, pack_checksum)
263         idx = load_pack_index("single.idx")
264         self.assertEquals(idx.version, self._expected_version)
265         self.assertSucceeds(idx.check)
266         self.assertEquals(idx.get_pack_checksum(), pack_checksum)
267         self.assertEquals(1, len(idx))
268         actual_entries = list(idx.iterentries())
269         self.assertEquals(len(my_entries), len(actual_entries))
270         for a, b in zip(my_entries, actual_entries):
271             self.assertEquals(a[0], b[0])
272             self.assertEquals(a[1], b[1])
273             if self._has_crc32_checksum:
274                 self.assertEquals(a[2], b[2])
275             else:
276                 self.assertTrue(b[2] is None)
277
278
279 class TestPackIndexWritingv1(unittest.TestCase, BaseTestPackIndexWriting):
280
281     def setUp(self):
282         unittest.TestCase.setUp(self)
283         self._has_crc32_checksum = False
284         self._expected_version = 1
285         self._write_fn = write_pack_index_v1
286
287
288 class TestPackIndexWritingv2(unittest.TestCase, BaseTestPackIndexWriting):
289
290     def setUp(self):
291         unittest.TestCase.setUp(self)
292         self._has_crc32_checksum = True
293         self._expected_version = 2
294         self._write_fn = write_pack_index_v2
295
296
297 class ReadZlibTests(unittest.TestCase):
298
299     decomp = (
300       'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
301       'parent None\n'
302       'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
303       'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
304       '\n'
305       "Provide replacement for mmap()'s offset argument.")
306     comp = zlib.compress(decomp)
307     extra = 'nextobject'
308
309     def setUp(self):
310         self.read = StringIO(self.comp + self.extra).read
311
312     def test_decompress_size(self):
313         good_decomp_len = len(self.decomp)
314         self.assertRaises(ValueError, read_zlib_chunks, self.read, -1)
315         self.assertRaises(zlib.error, read_zlib_chunks, self.read,
316                           good_decomp_len - 1)
317         self.assertRaises(zlib.error, read_zlib_chunks, self.read,
318                           good_decomp_len + 1)
319
320     def test_decompress_truncated(self):
321         read = StringIO(self.comp[:10]).read
322         self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
323
324         read = StringIO(self.comp).read
325         self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
326
327     def test_decompress_empty(self):
328         comp = zlib.compress('')
329         read = StringIO(comp + self.extra).read
330         decomp, comp_len, unused_data = read_zlib_chunks(read, 0)
331         self.assertEqual('', ''.join(decomp))
332         self.assertEqual(len(comp), comp_len)
333         self.assertNotEquals('', unused_data)
334         self.assertEquals(self.extra, unused_data + read())
335
336     def _do_decompress_test(self, buffer_size):
337         decomp, comp_len, unused_data = read_zlib_chunks(
338           self.read, len(self.decomp), buffer_size=buffer_size)
339         self.assertEquals(self.decomp, ''.join(decomp))
340         self.assertEquals(len(self.comp), comp_len)
341         self.assertNotEquals('', unused_data)
342         self.assertEquals(self.extra, unused_data + self.read())
343
344     def test_simple_decompress(self):
345         self._do_decompress_test(4096)
346
347     # These buffer sizes are not intended to be realistic, but rather simulate
348     # larger buffer sizes that may end at various places.
349     def test_decompress_buffer_size_1(self):
350         self._do_decompress_test(1)
351
352     def test_decompress_buffer_size_2(self):
353         self._do_decompress_test(2)
354
355     def test_decompress_buffer_size_3(self):
356         self._do_decompress_test(3)
357
358     def test_decompress_buffer_size_4(self):
359         self._do_decompress_test(4)