dfdf23954c09733133ca1f0fe51b04d904d4666f
[jelmer/dulwich-libgit2.git] / dulwich / tests / test_pack.py
1 # test_pack.py -- Tests for the handling of git packs.
2 # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3 # Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; version 2
8 # of the License, or (at your option) any later version of the license.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18 # MA  02110-1301, USA.
19
20
21 """Tests for Dulwich packs."""
22
23
24 from cStringIO import StringIO
25 import os
26 import unittest
27 import zlib
28
29 from dulwich.objects import (
30     Tree,
31     )
32 from dulwich.pack import (
33     Pack,
34     PackData,
35     apply_delta,
36     create_delta,
37     load_pack_index,
38     hex_to_sha,
39     read_zlib_chunks,
40     sha_to_hex,
41     write_pack_index_v1,
42     write_pack_index_v2,
43     write_pack,
44     )
45
46 pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
47
48 a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
49 tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
50 commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
51
52 class PackTests(unittest.TestCase):
53     """Base class for testing packs"""
54   
55     datadir = os.path.join(os.path.dirname(__file__), 'data/packs')
56   
57     def get_pack_index(self, sha):
58         """Returns a PackIndex from the datadir with the given sha"""
59         return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
60   
61     def get_pack_data(self, sha):
62         """Returns a PackData object from the datadir with the given sha"""
63         return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
64   
65     def get_pack(self, sha):
66         return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
67
68
69 class PackIndexTests(PackTests):
70     """Class that tests the index of packfiles"""
71   
72     def test_object_index(self):
73         """Tests that the correct object offset is returned from the index."""
74         p = self.get_pack_index(pack1_sha)
75         self.assertRaises(KeyError, p.object_index, pack1_sha)
76         self.assertEqual(p.object_index(a_sha), 178)
77         self.assertEqual(p.object_index(tree_sha), 138)
78         self.assertEqual(p.object_index(commit_sha), 12)
79   
80     def test_index_len(self):
81         p = self.get_pack_index(pack1_sha)
82         self.assertEquals(3, len(p))
83   
84     def test_get_stored_checksum(self):
85         p = self.get_pack_index(pack1_sha)
86         self.assertEquals("\xf2\x84\x8e*\xd1o2\x9a\xe1\xc9.;\x95\xe9\x18\x88\xda\xa5\xbd\x01", str(p.get_stored_checksum()))
87         self.assertEquals( 'r\x19\x80\xe8f\xaf\x9a_\x93\xadgAD\xe1E\x9b\x8b\xa3\xe7\xb7' , str(p.get_pack_checksum()))
88   
89     def test_index_check(self):
90         p = self.get_pack_index(pack1_sha)
91         self.assertEquals(True, p.check())
92   
93     def test_iterentries(self):
94         p = self.get_pack_index(pack1_sha)
95         self.assertEquals([('og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 178, None), ('\xb2\xa2vj(y\xc2\t\xab\x11v\xe7\xe7x\xb8\x1a\xe4"\xee\xaa', 138, None), ('\xf1\x8f\xaa\x16S\x1a\xc5p\xa3\xfd\xc8\xc7\xca\x16h%H\xda\xfd\x12', 12, None)], list(p.iterentries()))
96   
97     def test_iter(self):
98         p = self.get_pack_index(pack1_sha)
99         self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
100   
101
102 class TestPackDeltas(unittest.TestCase):
103   
104     test_string1 = "The answer was flailing in the wind"
105     test_string2 = "The answer was falling down the pipe"
106     test_string3 = "zzzzz"
107   
108     test_string_empty = ""
109     test_string_big = "Z" * 8192
110   
111     def _test_roundtrip(self, base, target):
112         self.assertEquals([target],
113             apply_delta(base, create_delta(base, target)))
114   
115     def test_nochange(self):
116         self._test_roundtrip(self.test_string1, self.test_string1)
117   
118     def test_change(self):
119         self._test_roundtrip(self.test_string1, self.test_string2)
120   
121     def test_rewrite(self):
122         self._test_roundtrip(self.test_string1, self.test_string3)
123   
124     def test_overflow(self):
125         self._test_roundtrip(self.test_string_empty, self.test_string_big)
126
127
128 class TestPackData(PackTests):
129     """Tests getting the data from the packfile."""
130   
131     def test_create_pack(self):
132         p = self.get_pack_data(pack1_sha)
133   
134     def test_pack_len(self):
135         p = self.get_pack_data(pack1_sha)
136         self.assertEquals(3, len(p))
137   
138     def test_index_check(self):
139         p = self.get_pack_data(pack1_sha)
140         self.assertEquals(True, p.check())
141   
142     def test_iterobjects(self):
143         p = self.get_pack_data(pack1_sha)
144         self.assertEquals([(12, 1, 'tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\nauthor James Westby <jw+debian@jameswestby.net> 1174945067 +0100\ncommitter James Westby <jw+debian@jameswestby.net> 1174945067 +0100\n\nTest commit\n', 3775879613L), (138, 2, '100644 a\x00og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 912998690L), (178, 3, 'test 1\n', 1373561701L)], [(len, type, "".join(chunks), offset) for (len, type, chunks, offset) in p.iterobjects()])
145   
146     def test_iterentries(self):
147         p = self.get_pack_data(pack1_sha)
148         self.assertEquals(set([('og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 178, 1373561701L), ('\xb2\xa2vj(y\xc2\t\xab\x11v\xe7\xe7x\xb8\x1a\xe4"\xee\xaa', 138, 912998690L), ('\xf1\x8f\xaa\x16S\x1a\xc5p\xa3\xfd\xc8\xc7\xca\x16h%H\xda\xfd\x12', 12, 3775879613L)]), set(p.iterentries()))
149   
150     def test_create_index_v1(self):
151         p = self.get_pack_data(pack1_sha)
152         p.create_index_v1("v1test.idx")
153         idx1 = load_pack_index("v1test.idx")
154         idx2 = self.get_pack_index(pack1_sha)
155         self.assertEquals(idx1, idx2)
156   
157     def test_create_index_v2(self):
158         p = self.get_pack_data(pack1_sha)
159         p.create_index_v2("v2test.idx")
160         idx1 = load_pack_index("v2test.idx")
161         idx2 = self.get_pack_index(pack1_sha)
162         self.assertEquals(idx1, idx2)
163
164
165 class TestPack(PackTests):
166
167     def test_len(self):
168         p = self.get_pack(pack1_sha)
169         self.assertEquals(3, len(p))
170
171     def test_contains(self):
172         p = self.get_pack(pack1_sha)
173         self.assertTrue(tree_sha in p)
174
175     def test_get(self):
176         p = self.get_pack(pack1_sha)
177         self.assertEquals(type(p[tree_sha]), Tree)
178
179     def test_iter(self):
180         p = self.get_pack(pack1_sha)
181         self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
182
183     def test_get_object_at(self):
184         """Tests random access for non-delta objects"""
185         p = self.get_pack(pack1_sha)
186         obj = p[a_sha]
187         self.assertEqual(obj.type_name, 'blob')
188         self.assertEqual(obj.sha().hexdigest(), a_sha)
189         obj = p[tree_sha]
190         self.assertEqual(obj.type_name, 'tree')
191         self.assertEqual(obj.sha().hexdigest(), tree_sha)
192         obj = p[commit_sha]
193         self.assertEqual(obj.type_name, 'commit')
194         self.assertEqual(obj.sha().hexdigest(), commit_sha)
195
196     def test_copy(self):
197         origpack = self.get_pack(pack1_sha)
198         self.assertEquals(True, origpack.index.check())
199         write_pack("Elch", [(x, "") for x in origpack.iterobjects()], 
200             len(origpack))
201         newpack = Pack("Elch")
202         self.assertEquals(origpack, newpack)
203         self.assertEquals(True, newpack.index.check())
204         self.assertEquals(origpack.name(), newpack.name())
205         self.assertEquals(origpack.index.get_pack_checksum(), 
206                           newpack.index.get_pack_checksum())
207         
208         self.assertTrue(
209                 (origpack.index.version != newpack.index.version) or
210                 (origpack.index.get_stored_checksum() == newpack.index.get_stored_checksum()))
211
212     def test_commit_obj(self):
213         p = self.get_pack(pack1_sha)
214         commit = p[commit_sha]
215         self.assertEquals("James Westby <jw+debian@jameswestby.net>",
216             commit.author)
217         self.assertEquals([], commit.parents)
218
219     def test_name(self):
220         p = self.get_pack(pack1_sha)
221         self.assertEquals(pack1_sha, p.name())
222
223
224 class TestHexToSha(unittest.TestCase):
225
226     def test_simple(self):
227         self.assertEquals('\xab\xcd' * 10, hex_to_sha("abcd" * 10))
228
229     def test_reverse(self):
230         self.assertEquals("abcd" * 10, sha_to_hex('\xab\xcd' * 10))
231
232
233 class BaseTestPackIndexWriting(object):
234
235     def test_empty(self):
236         pack_checksum = 'r\x19\x80\xe8f\xaf\x9a_\x93\xadgAD\xe1E\x9b\x8b\xa3\xe7\xb7'
237         self._write_fn("empty.idx", [], pack_checksum)
238         idx = load_pack_index("empty.idx")
239         self.assertTrue(idx.check())
240         self.assertEquals(idx.get_pack_checksum(), pack_checksum)
241         self.assertEquals(0, len(idx))
242
243     def test_single(self):
244         pack_checksum = 'r\x19\x80\xe8f\xaf\x9a_\x93\xadgAD\xe1E\x9b\x8b\xa3\xe7\xb7'
245         my_entries = [('og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 178, 42)]
246         my_entries.sort()
247         self._write_fn("single.idx", my_entries, pack_checksum)
248         idx = load_pack_index("single.idx")
249         self.assertEquals(idx.version, self._expected_version)
250         self.assertTrue(idx.check())
251         self.assertEquals(idx.get_pack_checksum(), pack_checksum)
252         self.assertEquals(1, len(idx))
253         actual_entries = list(idx.iterentries())
254         self.assertEquals(len(my_entries), len(actual_entries))
255         for a, b in zip(my_entries, actual_entries):
256             self.assertEquals(a[0], b[0])
257             self.assertEquals(a[1], b[1])
258             if self._has_crc32_checksum:
259                 self.assertEquals(a[2], b[2])
260             else:
261                 self.assertTrue(b[2] is None)
262
263
264 class TestPackIndexWritingv1(unittest.TestCase, BaseTestPackIndexWriting):
265
266     def setUp(self):
267         unittest.TestCase.setUp(self)
268         self._has_crc32_checksum = False
269         self._expected_version = 1
270         self._write_fn = write_pack_index_v1
271
272
273 class TestPackIndexWritingv2(unittest.TestCase, BaseTestPackIndexWriting):
274
275     def setUp(self):
276         unittest.TestCase.setUp(self)
277         self._has_crc32_checksum = True
278         self._expected_version = 2
279         self._write_fn = write_pack_index_v2
280
281
282 class ReadZlibTests(unittest.TestCase):
283
284     decomp = (
285       'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
286       'parent None\n'
287       'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
288       'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
289       '\n'
290       "Provide replacement for mmap()'s offset argument.")
291     comp = zlib.compress(decomp)
292     extra = 'nextobject'
293
294     def setUp(self):
295         self.read = StringIO(self.comp + self.extra).read
296
297     def test_decompress_size(self):
298         good_decomp_len = len(self.decomp)
299         self.assertRaises(ValueError, read_zlib_chunks, self.read, -1)
300         self.assertRaises(zlib.error, read_zlib_chunks, self.read,
301                           good_decomp_len - 1)
302         self.assertRaises(zlib.error, read_zlib_chunks, self.read,
303                           good_decomp_len + 1)
304
305     def test_decompress_truncated(self):
306         read = StringIO(self.comp[:10]).read
307         self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
308
309         read = StringIO(self.comp).read
310         self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
311
312     def test_decompress_empty(self):
313         comp = zlib.compress('')
314         read = StringIO(comp + self.extra).read
315         decomp, comp_len, unused_data = read_zlib_chunks(read, 0)
316         self.assertEqual('', ''.join(decomp))
317         self.assertEqual(len(comp), comp_len)
318         self.assertNotEquals('', unused_data)
319         self.assertEquals(self.extra, unused_data + read())
320
321     def _do_decompress_test(self, buffer_size):
322         decomp, comp_len, unused_data = read_zlib_chunks(
323           self.read, len(self.decomp), buffer_size=buffer_size)
324         self.assertEquals(self.decomp, ''.join(decomp))
325         self.assertEquals(len(self.comp), comp_len)
326         self.assertNotEquals('', unused_data)
327         self.assertEquals(self.extra, unused_data + self.read())
328
329     def test_simple_decompress(self):
330         self._do_decompress_test(4096)
331
332     # These buffer sizes are not intended to be realistic, but rather simulate
333     # larger buffer sizes that may end at various places.
334     def test_decompress_buffer_size_1(self):
335         self._do_decompress_test(1)
336
337     def test_decompress_buffer_size_2(self):
338         self._do_decompress_test(2)
339
340     def test_decompress_buffer_size_3(self):
341         self._do_decompress_test(3)
342
343     def test_decompress_buffer_size_4(self):
344         self._do_decompress_test(4)