New dulwich.pack.MemoryPackIndex class.
[jelmer/dulwich-libgit2.git] / dulwich / tests / test_pack.py
1 # test_pack.py -- Tests for the handling of git packs.
2 # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3 # Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; version 2
8 # of the License, or (at your option) any later version of the license.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18 # MA  02110-1301, USA.
19
20 """Tests for Dulwich packs."""
21
22
23 from cStringIO import StringIO
24 import os
25 import shutil
26 import tempfile
27 import zlib
28
29 from dulwich.errors import (
30     ChecksumMismatch,
31     )
32 from dulwich.file import (
33     GitFile,
34     )
35 from dulwich.objects import (
36     hex_to_sha,
37     sha_to_hex,
38     Tree,
39     )
40 from dulwich.pack import (
41     MemoryPackIndex,
42     Pack,
43     PackData,
44     ThinPackData,
45     apply_delta,
46     create_delta,
47     load_pack_index,
48     read_zlib_chunks,
49     write_pack_header,
50     write_pack_index_v1,
51     write_pack_index_v2,
52     write_pack,
53     )
54 from dulwich.tests import (
55     TestCase,
56     )
57
58 pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
59
60 a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
61 tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
62 commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
63
64
65 class PackTests(TestCase):
66     """Base class for testing packs"""
67
68     def setUp(self):
69         super(PackTests, self).setUp()
70         self.tempdir = tempfile.mkdtemp()
71
72     def tearDown(self):
73         shutil.rmtree(self.tempdir)
74         super(PackTests, self).tearDown()
75
76     datadir = os.path.join(os.path.dirname(__file__), 'data/packs')
77
78     def get_pack_index(self, sha):
79         """Returns a PackIndex from the datadir with the given sha"""
80         return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
81
82     def get_pack_data(self, sha):
83         """Returns a PackData object from the datadir with the given sha"""
84         return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
85
86     def get_pack(self, sha):
87         return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
88
89     def assertSucceeds(self, func, *args, **kwargs):
90         try:
91             func(*args, **kwargs)
92         except ChecksumMismatch, e:
93             self.fail(e)
94
95
96 class PackIndexTests(PackTests):
97     """Class that tests the index of packfiles"""
98
99     def test_object_index(self):
100         """Tests that the correct object offset is returned from the index."""
101         p = self.get_pack_index(pack1_sha)
102         self.assertRaises(KeyError, p.object_index, pack1_sha)
103         self.assertEqual(p.object_index(a_sha), 178)
104         self.assertEqual(p.object_index(tree_sha), 138)
105         self.assertEqual(p.object_index(commit_sha), 12)
106
107     def test_index_len(self):
108         p = self.get_pack_index(pack1_sha)
109         self.assertEquals(3, len(p))
110
111     def test_get_stored_checksum(self):
112         p = self.get_pack_index(pack1_sha)
113         self.assertEquals('f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
114                           sha_to_hex(p.get_stored_checksum()))
115         self.assertEquals('721980e866af9a5f93ad674144e1459b8ba3e7b7',
116                           sha_to_hex(p.get_pack_checksum()))
117
118     def test_index_check(self):
119         p = self.get_pack_index(pack1_sha)
120         self.assertSucceeds(p.check)
121
122     def test_iterentries(self):
123         p = self.get_pack_index(pack1_sha)
124         entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
125         self.assertEquals([
126           ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
127           ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
128           ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
129           ], entries)
130
131     def test_iter(self):
132         p = self.get_pack_index(pack1_sha)
133         self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
134
135
136 class TestPackDeltas(TestCase):
137
138     test_string1 = 'The answer was flailing in the wind'
139     test_string2 = 'The answer was falling down the pipe'
140     test_string3 = 'zzzzz'
141
142     test_string_empty = ''
143     test_string_big = 'Z' * 8192
144
145     def _test_roundtrip(self, base, target):
146         self.assertEquals(target,
147                           ''.join(apply_delta(base, create_delta(base, target))))
148
149     def test_nochange(self):
150         self._test_roundtrip(self.test_string1, self.test_string1)
151
152     def test_change(self):
153         self._test_roundtrip(self.test_string1, self.test_string2)
154
155     def test_rewrite(self):
156         self._test_roundtrip(self.test_string1, self.test_string3)
157
158     def test_overflow(self):
159         self._test_roundtrip(self.test_string_empty, self.test_string_big)
160
161
162 class TestPackData(PackTests):
163     """Tests getting the data from the packfile."""
164
165     def test_create_pack(self):
166         p = self.get_pack_data(pack1_sha)
167
168     def test_from_file(self):
169         path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha)
170         PackData.from_file(open(path), os.path.getsize(path))
171
172     # TODO: more ThinPackData tests.
173     def test_thin_from_file(self):
174         test_sha = '1' * 40
175
176         def resolve(sha):
177             self.assertEqual(test_sha, sha)
178             return 3, 'data'
179
180         path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha)
181         data = ThinPackData.from_file(resolve, open(path),
182                                       os.path.getsize(path))
183         idx = self.get_pack_index(pack1_sha)
184         Pack.from_objects(data, idx)
185         self.assertEqual((None, 3, 'data'), data.get_ref(test_sha))
186
187     def test_pack_len(self):
188         p = self.get_pack_data(pack1_sha)
189         self.assertEquals(3, len(p))
190
191     def test_index_check(self):
192         p = self.get_pack_data(pack1_sha)
193         self.assertSucceeds(p.check)
194
195     def test_iterobjects(self):
196         p = self.get_pack_data(pack1_sha)
197         commit_data = ('tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
198                        'author James Westby <jw+debian@jameswestby.net> '
199                        '1174945067 +0100\n'
200                        'committer James Westby <jw+debian@jameswestby.net> '
201                        '1174945067 +0100\n'
202                        '\n'
203                        'Test commit\n')
204         blob_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
205         tree_data = '100644 a\0%s' % hex_to_sha(blob_sha)
206         actual = []
207         for offset, type_num, chunks, crc32 in p.iterobjects():
208             actual.append((offset, type_num, ''.join(chunks), crc32))
209         self.assertEquals([
210           (12, 1, commit_data, 3775879613L),
211           (138, 2, tree_data, 912998690L),
212           (178, 3, 'test 1\n', 1373561701L)
213           ], actual)
214
215     def test_iterentries(self):
216         p = self.get_pack_data(pack1_sha)
217         entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries())
218         self.assertEquals(set([
219           ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701L),
220           ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690L),
221           ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613L),
222           ]), entries)
223
224     def test_create_index_v1(self):
225         p = self.get_pack_data(pack1_sha)
226         filename = os.path.join(self.tempdir, 'v1test.idx')
227         p.create_index_v1(filename)
228         idx1 = load_pack_index(filename)
229         idx2 = self.get_pack_index(pack1_sha)
230         self.assertEquals(idx1, idx2)
231
232     def test_create_index_v2(self):
233         p = self.get_pack_data(pack1_sha)
234         filename = os.path.join(self.tempdir, 'v2test.idx')
235         p.create_index_v2(filename)
236         idx1 = load_pack_index(filename)
237         idx2 = self.get_pack_index(pack1_sha)
238         self.assertEquals(idx1, idx2)
239
240
241 class TestPack(PackTests):
242
243     def test_len(self):
244         p = self.get_pack(pack1_sha)
245         self.assertEquals(3, len(p))
246
247     def test_contains(self):
248         p = self.get_pack(pack1_sha)
249         self.assertTrue(tree_sha in p)
250
251     def test_get(self):
252         p = self.get_pack(pack1_sha)
253         self.assertEquals(type(p[tree_sha]), Tree)
254
255     def test_iter(self):
256         p = self.get_pack(pack1_sha)
257         self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
258
259     def test_get_object_at(self):
260         """Tests random access for non-delta objects"""
261         p = self.get_pack(pack1_sha)
262         obj = p[a_sha]
263         self.assertEqual(obj.type_name, 'blob')
264         self.assertEqual(obj.sha().hexdigest(), a_sha)
265         obj = p[tree_sha]
266         self.assertEqual(obj.type_name, 'tree')
267         self.assertEqual(obj.sha().hexdigest(), tree_sha)
268         obj = p[commit_sha]
269         self.assertEqual(obj.type_name, 'commit')
270         self.assertEqual(obj.sha().hexdigest(), commit_sha)
271
272     def test_copy(self):
273         origpack = self.get_pack(pack1_sha)
274         self.assertSucceeds(origpack.index.check)
275         basename = os.path.join(self.tempdir, 'Elch')
276         write_pack(basename, [(x, '') for x in origpack.iterobjects()],
277                    len(origpack))
278         newpack = Pack(basename)
279         self.assertEquals(origpack, newpack)
280         self.assertSucceeds(newpack.index.check)
281         self.assertEquals(origpack.name(), newpack.name())
282         self.assertEquals(origpack.index.get_pack_checksum(),
283                           newpack.index.get_pack_checksum())
284
285         wrong_version = origpack.index.version != newpack.index.version
286         orig_checksum = origpack.index.get_stored_checksum()
287         new_checksum = newpack.index.get_stored_checksum()
288         self.assertTrue(wrong_version or orig_checksum == new_checksum)
289
290     def test_commit_obj(self):
291         p = self.get_pack(pack1_sha)
292         commit = p[commit_sha]
293         self.assertEquals('James Westby <jw+debian@jameswestby.net>',
294                           commit.author)
295         self.assertEquals([], commit.parents)
296
297     def test_name(self):
298         p = self.get_pack(pack1_sha)
299         self.assertEquals(pack1_sha, p.name())
300
301
302 class WritePackHeaderTests(TestCase):
303
304     def test_simple(self):
305         f = StringIO()
306         write_pack_header(f, 42)
307         self.assertEquals('PACK\x00\x00\x00\x02\x00\x00\x00*',
308                 f.getvalue())
309
310
311 pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')
312
313
314 class BaseTestPackIndexWriting(object):
315
316     def assertSucceeds(self, func, *args, **kwargs):
317         try:
318             func(*args, **kwargs)
319         except ChecksumMismatch, e:
320             self.fail(e)
321
322     def index(self, filename, entries, pack_checksum):
323         raise NotImplementedError(self.index)
324
325     def test_empty(self):
326         idx = self.index('empty.idx', [], pack_checksum)
327         self.assertEquals(idx.get_pack_checksum(), pack_checksum)
328         self.assertEquals(0, len(idx))
329
330     def test_single(self):
331         entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
332         my_entries = [(entry_sha, 178, 42)]
333         idx = self.index('single.idx', my_entries, pack_checksum)
334         self.assertEquals(idx.get_pack_checksum(), pack_checksum)
335         self.assertEquals(1, len(idx))
336         actual_entries = list(idx.iterentries())
337         self.assertEquals(len(my_entries), len(actual_entries))
338         for mine, actual in zip(my_entries, actual_entries):
339             my_sha, my_offset, my_crc = mine
340             actual_sha, actual_offset, actual_crc = actual
341             self.assertEquals(my_sha, actual_sha)
342             self.assertEquals(my_offset, actual_offset)
343             if self._has_crc32_checksum:
344                 self.assertEquals(my_crc, actual_crc)
345             else:
346                 self.assertTrue(actual_crc is None)
347
348
349 class BaseTestFilePackIndexWriting(BaseTestPackIndexWriting):
350
351     def setUp(self):
352         self.tempdir = tempfile.mkdtemp()
353
354     def tearDown(self):
355         shutil.rmtree(self.tempdir)
356
357     def index(self, filename, entries, pack_checksum):
358         path = os.path.join(self.tempdir, filename)
359         self.writeIndex(path, entries, pack_checksum)
360         idx = load_pack_index(path)
361         self.assertSucceeds(idx.check)
362         self.assertEquals(idx.version, self._expected_version)
363         return idx
364
365     def writeIndex(self, filename, entries, pack_checksum):
366         # FIXME: Write to StringIO instead rather than hitting disk ?
367         f = GitFile(filename, "wb")
368         try:
369             self._write_fn(f, entries, pack_checksum)
370         finally:
371             f.close()
372
373
374 class TestMemoryIndexWriting(TestCase, BaseTestPackIndexWriting):
375
376     def setUp(self):
377         TestCase.setUp(self)
378         self._has_crc32_checksum = True
379
380     def index(self, filename, entries, pack_checksum):
381         return MemoryPackIndex(entries, pack_checksum)
382
383     def tearDown(self):
384         TestCase.tearDown(self)
385
386
387 class TestPackIndexWritingv1(TestCase, BaseTestFilePackIndexWriting):
388
389     def setUp(self):
390         TestCase.setUp(self)
391         BaseTestFilePackIndexWriting.setUp(self)
392         self._has_crc32_checksum = False
393         self._expected_version = 1
394         self._write_fn = write_pack_index_v1
395
396     def tearDown(self):
397         TestCase.tearDown(self)
398         BaseTestFilePackIndexWriting.tearDown(self)
399
400
401 class TestPackIndexWritingv2(TestCase, BaseTestFilePackIndexWriting):
402
403     def setUp(self):
404         TestCase.setUp(self)
405         BaseTestFilePackIndexWriting.setUp(self)
406         self._has_crc32_checksum = True
407         self._expected_version = 2
408         self._write_fn = write_pack_index_v2
409
410     def tearDown(self):
411         TestCase.tearDown(self)
412         BaseTestFilePackIndexWriting.tearDown(self)
413
414
415 class ReadZlibTests(TestCase):
416
417     decomp = (
418       'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
419       'parent None\n'
420       'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
421       'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
422       '\n'
423       "Provide replacement for mmap()'s offset argument.")
424     comp = zlib.compress(decomp)
425     extra = 'nextobject'
426
427     def setUp(self):
428         super(ReadZlibTests, self).setUp()
429         self.read = StringIO(self.comp + self.extra).read
430
431     def test_decompress_size(self):
432         good_decomp_len = len(self.decomp)
433         self.assertRaises(ValueError, read_zlib_chunks, self.read, -1)
434         self.assertRaises(zlib.error, read_zlib_chunks, self.read,
435                           good_decomp_len - 1)
436         self.assertRaises(zlib.error, read_zlib_chunks, self.read,
437                           good_decomp_len + 1)
438
439     def test_decompress_truncated(self):
440         read = StringIO(self.comp[:10]).read
441         self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
442
443         read = StringIO(self.comp).read
444         self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
445
446     def test_decompress_empty(self):
447         comp = zlib.compress('')
448         read = StringIO(comp + self.extra).read
449         decomp, comp_len, unused_data = read_zlib_chunks(read, 0)
450         self.assertEqual('', ''.join(decomp))
451         self.assertEqual(len(comp), comp_len)
452         self.assertNotEquals('', unused_data)
453         self.assertEquals(self.extra, unused_data + read())
454
455     def _do_decompress_test(self, buffer_size):
456         decomp, comp_len, unused_data = read_zlib_chunks(
457           self.read, len(self.decomp), buffer_size=buffer_size)
458         self.assertEquals(self.decomp, ''.join(decomp))
459         self.assertEquals(len(self.comp), comp_len)
460         self.assertNotEquals('', unused_data)
461         self.assertEquals(self.extra, unused_data + self.read())
462
463     def test_simple_decompress(self):
464         self._do_decompress_test(4096)
465
466     # These buffer sizes are not intended to be realistic, but rather simulate
467     # larger buffer sizes that may end at various places.
468     def test_decompress_buffer_size_1(self):
469         self._do_decompress_test(1)
470
471     def test_decompress_buffer_size_2(self):
472         self._do_decompress_test(2)
473
474     def test_decompress_buffer_size_3(self):
475         self._do_decompress_test(3)
476
477     def test_decompress_buffer_size_4(self):
478         self._do_decompress_test(4)