Add make_object and make_commit convenience functions to test utils.
[jelmer/dulwich-libgit2.git] / dulwich / tests / test_pack.py
1 # test_pack.py -- Tests for the handling of git packs.
2 # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3 # Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; version 2
8 # of the License, or (at your option) any later version of the license.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18 # MA  02110-1301, USA.
19
20
21 """Tests for Dulwich packs."""
22
23
24 from cStringIO import StringIO
25 import os
26 import shutil
27 import tempfile
28 import unittest
29 import zlib
30
31 from dulwich.errors import (
32     ChecksumMismatch,
33     )
34 from dulwich.objects import (
35     hex_to_sha,
36     sha_to_hex,
37     Tree,
38     )
39 from dulwich.pack import (
40     Pack,
41     PackData,
42     apply_delta,
43     create_delta,
44     load_pack_index,
45     hex_to_sha,
46     read_zlib_chunks,
47     sha_to_hex,
48     write_pack_index_v1,
49     write_pack_index_v2,
50     write_pack,
51     )
52
53 pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
54
55 a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
56 tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
57 commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
58
59
60 class PackTests(unittest.TestCase):
61     """Base class for testing packs"""
62
63     def setUp(self):
64         self.tempdir = tempfile.mkdtemp()
65
66     def tearDown(self):
67         shutil.rmtree(self.tempdir)
68
69     datadir = os.path.join(os.path.dirname(__file__), 'data/packs')
70
71     def get_pack_index(self, sha):
72         """Returns a PackIndex from the datadir with the given sha"""
73         return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
74
75     def get_pack_data(self, sha):
76         """Returns a PackData object from the datadir with the given sha"""
77         return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
78
79     def get_pack(self, sha):
80         return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
81
82     def assertSucceeds(self, func, *args, **kwargs):
83         try:
84             func(*args, **kwargs)
85         except ChecksumMismatch, e:
86             self.fail(e)
87
88
89 class PackIndexTests(PackTests):
90     """Class that tests the index of packfiles"""
91
92     def test_object_index(self):
93         """Tests that the correct object offset is returned from the index."""
94         p = self.get_pack_index(pack1_sha)
95         self.assertRaises(KeyError, p.object_index, pack1_sha)
96         self.assertEqual(p.object_index(a_sha), 178)
97         self.assertEqual(p.object_index(tree_sha), 138)
98         self.assertEqual(p.object_index(commit_sha), 12)
99
100     def test_index_len(self):
101         p = self.get_pack_index(pack1_sha)
102         self.assertEquals(3, len(p))
103
104     def test_get_stored_checksum(self):
105         p = self.get_pack_index(pack1_sha)
106         self.assertEquals('f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
107                           sha_to_hex(p.get_stored_checksum()))
108         self.assertEquals('721980e866af9a5f93ad674144e1459b8ba3e7b7',
109                           sha_to_hex(p.get_pack_checksum()))
110
111     def test_index_check(self):
112         p = self.get_pack_index(pack1_sha)
113         self.assertSucceeds(p.check)
114
115     def test_iterentries(self):
116         p = self.get_pack_index(pack1_sha)
117         entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
118         self.assertEquals([
119           ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
120           ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
121           ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
122           ], entries)
123
124     def test_iter(self):
125         p = self.get_pack_index(pack1_sha)
126         self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
127
128
129 class TestPackDeltas(unittest.TestCase):
130
131     test_string1 = 'The answer was flailing in the wind'
132     test_string2 = 'The answer was falling down the pipe'
133     test_string3 = 'zzzzz'
134
135     test_string_empty = ''
136     test_string_big = 'Z' * 8192
137
138     def _test_roundtrip(self, base, target):
139         self.assertEquals([target],
140                           apply_delta(base, create_delta(base, target)))
141
142     def test_nochange(self):
143         self._test_roundtrip(self.test_string1, self.test_string1)
144
145     def test_change(self):
146         self._test_roundtrip(self.test_string1, self.test_string2)
147
148     def test_rewrite(self):
149         self._test_roundtrip(self.test_string1, self.test_string3)
150
151     def test_overflow(self):
152         self._test_roundtrip(self.test_string_empty, self.test_string_big)
153
154
155 class TestPackData(PackTests):
156     """Tests getting the data from the packfile."""
157
158     def test_create_pack(self):
159         p = self.get_pack_data(pack1_sha)
160
161     def test_pack_len(self):
162         p = self.get_pack_data(pack1_sha)
163         self.assertEquals(3, len(p))
164
165     def test_index_check(self):
166         p = self.get_pack_data(pack1_sha)
167         self.assertSucceeds(p.check)
168
169     def test_iterobjects(self):
170         p = self.get_pack_data(pack1_sha)
171         commit_data = ('tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
172                        'author James Westby <jw+debian@jameswestby.net> '
173                        '1174945067 +0100\n'
174                        'committer James Westby <jw+debian@jameswestby.net> '
175                        '1174945067 +0100\n'
176                        '\n'
177                        'Test commit\n')
178         blob_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
179         tree_data = '100644 a\0%s' % hex_to_sha(blob_sha)
180         actual = []
181         for offset, type_num, chunks, crc32 in p.iterobjects():
182             actual.append((offset, type_num, ''.join(chunks), crc32))
183         self.assertEquals([
184           (12, 1, commit_data, 3775879613L),
185           (138, 2, tree_data, 912998690L),
186           (178, 3, 'test 1\n', 1373561701L)
187           ], actual)
188
189     def test_iterentries(self):
190         p = self.get_pack_data(pack1_sha)
191         entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries())
192         self.assertEquals(set([
193           ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701L),
194           ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690L),
195           ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613L),
196           ]), entries)
197
198     def test_create_index_v1(self):
199         p = self.get_pack_data(pack1_sha)
200         filename = os.path.join(self.tempdir, 'v1test.idx')
201         p.create_index_v1(filename)
202         idx1 = load_pack_index(filename)
203         idx2 = self.get_pack_index(pack1_sha)
204         self.assertEquals(idx1, idx2)
205
206     def test_create_index_v2(self):
207         p = self.get_pack_data(pack1_sha)
208         filename = os.path.join(self.tempdir, 'v2test.idx')
209         p.create_index_v2(filename)
210         idx1 = load_pack_index(filename)
211         idx2 = self.get_pack_index(pack1_sha)
212         self.assertEquals(idx1, idx2)
213
214
215 class TestPack(PackTests):
216
217     def test_len(self):
218         p = self.get_pack(pack1_sha)
219         self.assertEquals(3, len(p))
220
221     def test_contains(self):
222         p = self.get_pack(pack1_sha)
223         self.assertTrue(tree_sha in p)
224
225     def test_get(self):
226         p = self.get_pack(pack1_sha)
227         self.assertEquals(type(p[tree_sha]), Tree)
228
229     def test_iter(self):
230         p = self.get_pack(pack1_sha)
231         self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
232
233     def test_get_object_at(self):
234         """Tests random access for non-delta objects"""
235         p = self.get_pack(pack1_sha)
236         obj = p[a_sha]
237         self.assertEqual(obj.type_name, 'blob')
238         self.assertEqual(obj.sha().hexdigest(), a_sha)
239         obj = p[tree_sha]
240         self.assertEqual(obj.type_name, 'tree')
241         self.assertEqual(obj.sha().hexdigest(), tree_sha)
242         obj = p[commit_sha]
243         self.assertEqual(obj.type_name, 'commit')
244         self.assertEqual(obj.sha().hexdigest(), commit_sha)
245
246     def test_copy(self):
247         origpack = self.get_pack(pack1_sha)
248         self.assertSucceeds(origpack.index.check)
249         basename = os.path.join(self.tempdir, 'Elch')
250         write_pack(basename, [(x, '') for x in origpack.iterobjects()],
251                    len(origpack))
252         newpack = Pack(basename)
253         self.assertEquals(origpack, newpack)
254         self.assertSucceeds(newpack.index.check)
255         self.assertEquals(origpack.name(), newpack.name())
256         self.assertEquals(origpack.index.get_pack_checksum(),
257                           newpack.index.get_pack_checksum())
258
259         wrong_version = origpack.index.version != newpack.index.version
260         orig_checksum = origpack.index.get_stored_checksum()
261         new_checksum = newpack.index.get_stored_checksum()
262         self.assertTrue(wrong_version or orig_checksum == new_checksum)
263
264     def test_commit_obj(self):
265         p = self.get_pack(pack1_sha)
266         commit = p[commit_sha]
267         self.assertEquals('James Westby <jw+debian@jameswestby.net>',
268                           commit.author)
269         self.assertEquals([], commit.parents)
270
271     def test_name(self):
272         p = self.get_pack(pack1_sha)
273         self.assertEquals(pack1_sha, p.name())
274
275
276 pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')
277
278
279 class BaseTestPackIndexWriting(object):
280
281     def setUp(self):
282         self.tempdir = tempfile.mkdtemp()
283
284     def tearDown(self):
285         shutil.rmtree(self.tempdir)
286
287     def assertSucceeds(self, func, *args, **kwargs):
288         try:
289             func(*args, **kwargs)
290         except ChecksumMismatch, e:
291             self.fail(e)
292
293     def test_empty(self):
294         filename = os.path.join(self.tempdir, 'empty.idx')
295         self._write_fn(filename, [], pack_checksum)
296         idx = load_pack_index(filename)
297         self.assertSucceeds(idx.check)
298         self.assertEquals(idx.get_pack_checksum(), pack_checksum)
299         self.assertEquals(0, len(idx))
300
301     def test_single(self):
302         entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
303         my_entries = [(entry_sha, 178, 42)]
304         filename = os.path.join(self.tempdir, 'single.idx')
305         self._write_fn(filename, my_entries, pack_checksum)
306         idx = load_pack_index(filename)
307         self.assertEquals(idx.version, self._expected_version)
308         self.assertSucceeds(idx.check)
309         self.assertEquals(idx.get_pack_checksum(), pack_checksum)
310         self.assertEquals(1, len(idx))
311         actual_entries = list(idx.iterentries())
312         self.assertEquals(len(my_entries), len(actual_entries))
313         for mine, actual in zip(my_entries, actual_entries):
314             my_sha, my_offset, my_crc = mine
315             actual_sha, actual_offset, actual_crc = actual
316             self.assertEquals(my_sha, actual_sha)
317             self.assertEquals(my_offset, actual_offset)
318             if self._has_crc32_checksum:
319                 self.assertEquals(my_crc, actual_crc)
320             else:
321                 self.assertTrue(actual_crc is None)
322
323
324 class TestPackIndexWritingv1(unittest.TestCase, BaseTestPackIndexWriting):
325
326     def setUp(self):
327         unittest.TestCase.setUp(self)
328         BaseTestPackIndexWriting.setUp(self)
329         self._has_crc32_checksum = False
330         self._expected_version = 1
331         self._write_fn = write_pack_index_v1
332
333     def tearDown(self):
334         unittest.TestCase.tearDown(self)
335         BaseTestPackIndexWriting.tearDown(self)
336
337
338 class TestPackIndexWritingv2(unittest.TestCase, BaseTestPackIndexWriting):
339
340     def setUp(self):
341         unittest.TestCase.setUp(self)
342         BaseTestPackIndexWriting.setUp(self)
343         self._has_crc32_checksum = True
344         self._expected_version = 2
345         self._write_fn = write_pack_index_v2
346
347     def tearDown(self):
348         unittest.TestCase.tearDown(self)
349         BaseTestPackIndexWriting.tearDown(self)
350
351
352 class ReadZlibTests(unittest.TestCase):
353
354     decomp = (
355       'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
356       'parent None\n'
357       'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
358       'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
359       '\n'
360       "Provide replacement for mmap()'s offset argument.")
361     comp = zlib.compress(decomp)
362     extra = 'nextobject'
363
364     def setUp(self):
365         self.read = StringIO(self.comp + self.extra).read
366
367     def test_decompress_size(self):
368         good_decomp_len = len(self.decomp)
369         self.assertRaises(ValueError, read_zlib_chunks, self.read, -1)
370         self.assertRaises(zlib.error, read_zlib_chunks, self.read,
371                           good_decomp_len - 1)
372         self.assertRaises(zlib.error, read_zlib_chunks, self.read,
373                           good_decomp_len + 1)
374
375     def test_decompress_truncated(self):
376         read = StringIO(self.comp[:10]).read
377         self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
378
379         read = StringIO(self.comp).read
380         self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
381
382     def test_decompress_empty(self):
383         comp = zlib.compress('')
384         read = StringIO(comp + self.extra).read
385         decomp, comp_len, unused_data = read_zlib_chunks(read, 0)
386         self.assertEqual('', ''.join(decomp))
387         self.assertEqual(len(comp), comp_len)
388         self.assertNotEquals('', unused_data)
389         self.assertEquals(self.extra, unused_data + read())
390
391     def _do_decompress_test(self, buffer_size):
392         decomp, comp_len, unused_data = read_zlib_chunks(
393           self.read, len(self.decomp), buffer_size=buffer_size)
394         self.assertEquals(self.decomp, ''.join(decomp))
395         self.assertEquals(len(self.comp), comp_len)
396         self.assertNotEquals('', unused_data)
397         self.assertEquals(self.extra, unused_data + self.read())
398
399     def test_simple_decompress(self):
400         self._do_decompress_test(4096)
401
402     # These buffer sizes are not intended to be realistic, but rather simulate
403     # larger buffer sizes that may end at various places.
404     def test_decompress_buffer_size_1(self):
405         self._do_decompress_test(1)
406
407     def test_decompress_buffer_size_2(self):
408         self._do_decompress_test(2)
409
410     def test_decompress_buffer_size_3(self):
411         self._do_decompress_test(3)
412
413     def test_decompress_buffer_size_4(self):
414         self._do_decompress_test(4)