129836248088d50b0d5da881ec4b6f6c41c9983c
[jelmer/dulwich-libgit2.git] / dulwich / tests / test_pack.py
1 # test_pack.py -- Tests for the handling of git packs.
2 # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3 # Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; version 2
8 # of the License, or (at your option) any later version of the license.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18 # MA  02110-1301, USA.
19
20 """Tests for Dulwich packs."""
21
22
23 from cStringIO import StringIO
24 import os
25 import shutil
26 import tempfile
27 import zlib
28
29 from dulwich.errors import (
30     ChecksumMismatch,
31     )
32 from dulwich.file import (
33     GitFile,
34     )
35 from dulwich.objects import (
36     hex_to_sha,
37     sha_to_hex,
38     Tree,
39     )
40 from dulwich.pack import (
41     Pack,
42     PackData,
43     ThinPackData,
44     apply_delta,
45     create_delta,
46     load_pack_index,
47     read_zlib_chunks,
48     write_pack_header,
49     write_pack_index_v1,
50     write_pack_index_v2,
51     write_pack,
52     )
53 from dulwich.tests import (
54     TestCase,
55     )
56
57 pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
58
59 a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
60 tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
61 commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
62
63
64 class PackTests(TestCase):
65     """Base class for testing packs"""
66
67     def setUp(self):
68         super(PackTests, self).setUp()
69         self.tempdir = tempfile.mkdtemp()
70
71     def tearDown(self):
72         shutil.rmtree(self.tempdir)
73         super(PackTests, self).tearDown()
74
75     datadir = os.path.join(os.path.dirname(__file__), 'data/packs')
76
77     def get_pack_index(self, sha):
78         """Returns a PackIndex from the datadir with the given sha"""
79         return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
80
81     def get_pack_data(self, sha):
82         """Returns a PackData object from the datadir with the given sha"""
83         return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
84
85     def get_pack(self, sha):
86         return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
87
88     def assertSucceeds(self, func, *args, **kwargs):
89         try:
90             func(*args, **kwargs)
91         except ChecksumMismatch, e:
92             self.fail(e)
93
94
95 class PackIndexTests(PackTests):
96     """Class that tests the index of packfiles"""
97
98     def test_object_index(self):
99         """Tests that the correct object offset is returned from the index."""
100         p = self.get_pack_index(pack1_sha)
101         self.assertRaises(KeyError, p.object_index, pack1_sha)
102         self.assertEqual(p.object_index(a_sha), 178)
103         self.assertEqual(p.object_index(tree_sha), 138)
104         self.assertEqual(p.object_index(commit_sha), 12)
105
106     def test_index_len(self):
107         p = self.get_pack_index(pack1_sha)
108         self.assertEquals(3, len(p))
109
110     def test_get_stored_checksum(self):
111         p = self.get_pack_index(pack1_sha)
112         self.assertEquals('f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
113                           sha_to_hex(p.get_stored_checksum()))
114         self.assertEquals('721980e866af9a5f93ad674144e1459b8ba3e7b7',
115                           sha_to_hex(p.get_pack_checksum()))
116
117     def test_index_check(self):
118         p = self.get_pack_index(pack1_sha)
119         self.assertSucceeds(p.check)
120
121     def test_iterentries(self):
122         p = self.get_pack_index(pack1_sha)
123         entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
124         self.assertEquals([
125           ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
126           ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
127           ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
128           ], entries)
129
130     def test_iter(self):
131         p = self.get_pack_index(pack1_sha)
132         self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
133
134
135 class TestPackDeltas(TestCase):
136
137     test_string1 = 'The answer was flailing in the wind'
138     test_string2 = 'The answer was falling down the pipe'
139     test_string3 = 'zzzzz'
140
141     test_string_empty = ''
142     test_string_big = 'Z' * 8192
143
144     def _test_roundtrip(self, base, target):
145         self.assertEquals(target,
146                           ''.join(apply_delta(base, create_delta(base, target))))
147
148     def test_nochange(self):
149         self._test_roundtrip(self.test_string1, self.test_string1)
150
151     def test_change(self):
152         self._test_roundtrip(self.test_string1, self.test_string2)
153
154     def test_rewrite(self):
155         self._test_roundtrip(self.test_string1, self.test_string3)
156
157     def test_overflow(self):
158         self._test_roundtrip(self.test_string_empty, self.test_string_big)
159
160
161 class TestPackData(PackTests):
162     """Tests getting the data from the packfile."""
163
164     def test_create_pack(self):
165         p = self.get_pack_data(pack1_sha)
166
167     def test_from_file(self):
168         path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha)
169         PackData.from_file(open(path), os.path.getsize(path))
170
171     # TODO: more ThinPackData tests.
172     def test_thin_from_file(self):
173         test_sha = '1' * 40
174
175         def resolve(sha):
176             self.assertEqual(test_sha, sha)
177             return 3, 'data'
178
179         path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha)
180         data = ThinPackData.from_file(resolve, open(path),
181                                       os.path.getsize(path))
182         idx = self.get_pack_index(pack1_sha)
183         Pack.from_objects(data, idx)
184         self.assertEqual((None, 3, 'data'), data.get_ref(test_sha))
185
186     def test_pack_len(self):
187         p = self.get_pack_data(pack1_sha)
188         self.assertEquals(3, len(p))
189
190     def test_index_check(self):
191         p = self.get_pack_data(pack1_sha)
192         self.assertSucceeds(p.check)
193
194     def test_iterobjects(self):
195         p = self.get_pack_data(pack1_sha)
196         commit_data = ('tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
197                        'author James Westby <jw+debian@jameswestby.net> '
198                        '1174945067 +0100\n'
199                        'committer James Westby <jw+debian@jameswestby.net> '
200                        '1174945067 +0100\n'
201                        '\n'
202                        'Test commit\n')
203         blob_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
204         tree_data = '100644 a\0%s' % hex_to_sha(blob_sha)
205         actual = []
206         for offset, type_num, chunks, crc32 in p.iterobjects():
207             actual.append((offset, type_num, ''.join(chunks), crc32))
208         self.assertEquals([
209           (12, 1, commit_data, 3775879613L),
210           (138, 2, tree_data, 912998690L),
211           (178, 3, 'test 1\n', 1373561701L)
212           ], actual)
213
214     def test_iterentries(self):
215         p = self.get_pack_data(pack1_sha)
216         entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries())
217         self.assertEquals(set([
218           ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701L),
219           ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690L),
220           ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613L),
221           ]), entries)
222
223     def test_create_index_v1(self):
224         p = self.get_pack_data(pack1_sha)
225         filename = os.path.join(self.tempdir, 'v1test.idx')
226         p.create_index_v1(filename)
227         idx1 = load_pack_index(filename)
228         idx2 = self.get_pack_index(pack1_sha)
229         self.assertEquals(idx1, idx2)
230
231     def test_create_index_v2(self):
232         p = self.get_pack_data(pack1_sha)
233         filename = os.path.join(self.tempdir, 'v2test.idx')
234         p.create_index_v2(filename)
235         idx1 = load_pack_index(filename)
236         idx2 = self.get_pack_index(pack1_sha)
237         self.assertEquals(idx1, idx2)
238
239
240 class TestPack(PackTests):
241
242     def test_len(self):
243         p = self.get_pack(pack1_sha)
244         self.assertEquals(3, len(p))
245
246     def test_contains(self):
247         p = self.get_pack(pack1_sha)
248         self.assertTrue(tree_sha in p)
249
250     def test_get(self):
251         p = self.get_pack(pack1_sha)
252         self.assertEquals(type(p[tree_sha]), Tree)
253
254     def test_iter(self):
255         p = self.get_pack(pack1_sha)
256         self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
257
258     def test_get_object_at(self):
259         """Tests random access for non-delta objects"""
260         p = self.get_pack(pack1_sha)
261         obj = p[a_sha]
262         self.assertEqual(obj.type_name, 'blob')
263         self.assertEqual(obj.sha().hexdigest(), a_sha)
264         obj = p[tree_sha]
265         self.assertEqual(obj.type_name, 'tree')
266         self.assertEqual(obj.sha().hexdigest(), tree_sha)
267         obj = p[commit_sha]
268         self.assertEqual(obj.type_name, 'commit')
269         self.assertEqual(obj.sha().hexdigest(), commit_sha)
270
271     def test_copy(self):
272         origpack = self.get_pack(pack1_sha)
273         self.assertSucceeds(origpack.index.check)
274         basename = os.path.join(self.tempdir, 'Elch')
275         write_pack(basename, [(x, '') for x in origpack.iterobjects()],
276                    len(origpack))
277         newpack = Pack(basename)
278         self.assertEquals(origpack, newpack)
279         self.assertSucceeds(newpack.index.check)
280         self.assertEquals(origpack.name(), newpack.name())
281         self.assertEquals(origpack.index.get_pack_checksum(),
282                           newpack.index.get_pack_checksum())
283
284         wrong_version = origpack.index.version != newpack.index.version
285         orig_checksum = origpack.index.get_stored_checksum()
286         new_checksum = newpack.index.get_stored_checksum()
287         self.assertTrue(wrong_version or orig_checksum == new_checksum)
288
289     def test_commit_obj(self):
290         p = self.get_pack(pack1_sha)
291         commit = p[commit_sha]
292         self.assertEquals('James Westby <jw+debian@jameswestby.net>',
293                           commit.author)
294         self.assertEquals([], commit.parents)
295
296     def test_name(self):
297         p = self.get_pack(pack1_sha)
298         self.assertEquals(pack1_sha, p.name())
299
300
301 class WritePackHeaderTests(TestCase):
302
303     def test_simple(self):
304         f = StringIO()
305         write_pack_header(f, 42)
306         self.assertEquals('PACK\x00\x00\x00\x02\x00\x00\x00*',
307                 f.getvalue())
308
309
310 pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')
311
312
313 class BaseTestPackIndexWriting(object):
314
315     def setUp(self):
316         self.tempdir = tempfile.mkdtemp()
317
318     def tearDown(self):
319         shutil.rmtree(self.tempdir)
320
321     def assertSucceeds(self, func, *args, **kwargs):
322         try:
323             func(*args, **kwargs)
324         except ChecksumMismatch, e:
325             self.fail(e)
326
327     def writeIndex(self, filename, entries, pack_checksum):
328         # FIXME: Write to StringIO instead rather than hitting disk ?
329         f = GitFile(filename, "wb")
330         try:
331             self._write_fn(f, entries, pack_checksum)
332         finally:
333             f.close()
334
335     def test_empty(self):
336         filename = os.path.join(self.tempdir, 'empty.idx')
337         self.writeIndex(filename, [], pack_checksum)
338         idx = load_pack_index(filename)
339         self.assertSucceeds(idx.check)
340         self.assertEquals(idx.get_pack_checksum(), pack_checksum)
341         self.assertEquals(0, len(idx))
342
343     def test_single(self):
344         entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
345         my_entries = [(entry_sha, 178, 42)]
346         filename = os.path.join(self.tempdir, 'single.idx')
347         self.writeIndex(filename, my_entries, pack_checksum)
348         idx = load_pack_index(filename)
349         self.assertEquals(idx.version, self._expected_version)
350         self.assertSucceeds(idx.check)
351         self.assertEquals(idx.get_pack_checksum(), pack_checksum)
352         self.assertEquals(1, len(idx))
353         actual_entries = list(idx.iterentries())
354         self.assertEquals(len(my_entries), len(actual_entries))
355         for mine, actual in zip(my_entries, actual_entries):
356             my_sha, my_offset, my_crc = mine
357             actual_sha, actual_offset, actual_crc = actual
358             self.assertEquals(my_sha, actual_sha)
359             self.assertEquals(my_offset, actual_offset)
360             if self._has_crc32_checksum:
361                 self.assertEquals(my_crc, actual_crc)
362             else:
363                 self.assertTrue(actual_crc is None)
364
365
366 class TestPackIndexWritingv1(TestCase, BaseTestPackIndexWriting):
367
368     def setUp(self):
369         TestCase.setUp(self)
370         BaseTestPackIndexWriting.setUp(self)
371         self._has_crc32_checksum = False
372         self._expected_version = 1
373         self._write_fn = write_pack_index_v1
374
375     def tearDown(self):
376         TestCase.tearDown(self)
377         BaseTestPackIndexWriting.tearDown(self)
378
379
380 class TestPackIndexWritingv2(TestCase, BaseTestPackIndexWriting):
381
382     def setUp(self):
383         TestCase.setUp(self)
384         BaseTestPackIndexWriting.setUp(self)
385         self._has_crc32_checksum = True
386         self._expected_version = 2
387         self._write_fn = write_pack_index_v2
388
389     def tearDown(self):
390         TestCase.tearDown(self)
391         BaseTestPackIndexWriting.tearDown(self)
392
393
394 class ReadZlibTests(TestCase):
395
396     decomp = (
397       'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
398       'parent None\n'
399       'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
400       'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
401       '\n'
402       "Provide replacement for mmap()'s offset argument.")
403     comp = zlib.compress(decomp)
404     extra = 'nextobject'
405
406     def setUp(self):
407         super(ReadZlibTests, self).setUp()
408         self.read = StringIO(self.comp + self.extra).read
409
410     def test_decompress_size(self):
411         good_decomp_len = len(self.decomp)
412         self.assertRaises(ValueError, read_zlib_chunks, self.read, -1)
413         self.assertRaises(zlib.error, read_zlib_chunks, self.read,
414                           good_decomp_len - 1)
415         self.assertRaises(zlib.error, read_zlib_chunks, self.read,
416                           good_decomp_len + 1)
417
418     def test_decompress_truncated(self):
419         read = StringIO(self.comp[:10]).read
420         self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
421
422         read = StringIO(self.comp).read
423         self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
424
425     def test_decompress_empty(self):
426         comp = zlib.compress('')
427         read = StringIO(comp + self.extra).read
428         decomp, comp_len, unused_data = read_zlib_chunks(read, 0)
429         self.assertEqual('', ''.join(decomp))
430         self.assertEqual(len(comp), comp_len)
431         self.assertNotEquals('', unused_data)
432         self.assertEquals(self.extra, unused_data + read())
433
434     def _do_decompress_test(self, buffer_size):
435         decomp, comp_len, unused_data = read_zlib_chunks(
436           self.read, len(self.decomp), buffer_size=buffer_size)
437         self.assertEquals(self.decomp, ''.join(decomp))
438         self.assertEquals(len(self.comp), comp_len)
439         self.assertNotEquals('', unused_data)
440         self.assertEquals(self.extra, unused_data + self.read())
441
442     def test_simple_decompress(self):
443         self._do_decompress_test(4096)
444
445     # These buffer sizes are not intended to be realistic, but rather simulate
446     # larger buffer sizes that may end at various places.
447     def test_decompress_buffer_size_1(self):
448         self._do_decompress_test(1)
449
450     def test_decompress_buffer_size_2(self):
451         self._do_decompress_test(2)
452
453     def test_decompress_buffer_size_3(self):
454         self._do_decompress_test(3)
455
456     def test_decompress_buffer_size_4(self):
457         self._do_decompress_test(4)