423d3b1a43e6908c8c5757ecc31f6129b724b1ef
[jelmer/dulwich-libgit2.git] / dulwich / tests / test_repository.py
1 # test_repository.py -- tests for repository.py
2 # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3 #
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; version 2
7 # of the License or (at your option) any later version of
8 # the License.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18 # MA  02110-1301, USA.
19
20 """Tests for the repository."""
21
22 from cStringIO import StringIO
23 import os
24 import shutil
25 import tempfile
26 import unittest
27 import warnings
28
29 from dulwich import errors
30 from dulwich.object_store import (
31     tree_lookup_path,
32     )
33 from dulwich import objects
34 from dulwich.repo import (
35     check_ref_format,
36     DictRefsContainer,
37     Repo,
38     read_packed_refs,
39     read_packed_refs_with_peeled,
40     write_packed_refs,
41     _split_ref_line,
42     )
43 from dulwich.tests.utils import (
44     open_repo,
45     tear_down_repo,
46     )
47
48 missing_sha = 'b91fa4d900e17e99b433218e988c4eb4a3e9a097'
49
50
51 class CreateRepositoryTests(unittest.TestCase):
52
53     def assertFileContentsEqual(self, expected, repo, path):
54         f = repo.get_named_file(path)
55         if not f:
56             self.assertEqual(expected, None)
57         else:
58             try:
59                 self.assertEqual(expected, f.read())
60             finally:
61                 f.close()
62
63     def _check_repo_contents(self, repo):
64         self.assertTrue(repo.bare)
65         self.assertFileContentsEqual('Unnamed repository', repo, 'description')
66         self.assertFileContentsEqual('', repo, os.path.join('info', 'exclude'))
67         self.assertFileContentsEqual(None, repo, 'nonexistent file')
68
69     def test_create_disk(self):
70         tmp_dir = tempfile.mkdtemp()
71         try:
72             repo = Repo.init_bare(tmp_dir)
73             self.assertEquals(tmp_dir, repo._controldir)
74             self._check_repo_contents(repo)
75         finally:
76             shutil.rmtree(tmp_dir)
77
78
79 class RepositoryTests(unittest.TestCase):
80
81     def setUp(self):
82         self._repo = None
83
84     def tearDown(self):
85         if self._repo is not None:
86             tear_down_repo(self._repo)
87
88     def test_simple_props(self):
89         r = self._repo = open_repo('a.git')
90         self.assertEqual(r.controldir(), r.path)
91
92     def test_ref(self):
93         r = self._repo = open_repo('a.git')
94         self.assertEqual(r.ref('refs/heads/master'),
95                          'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
96
97     def test_setitem(self):
98         r = self._repo = open_repo('a.git')
99         r["refs/tags/foo"] = 'a90fa2d900a17e99b433217e988c4eb4a2e9a097'
100         self.assertEquals('a90fa2d900a17e99b433217e988c4eb4a2e9a097',
101                           r["refs/tags/foo"].id)
102
103     def test_get_refs(self):
104         r = self._repo = open_repo('a.git')
105         self.assertEqual({
106             'HEAD': 'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
107             'refs/heads/master': 'a90fa2d900a17e99b433217e988c4eb4a2e9a097',
108             'refs/tags/mytag': '28237f4dc30d0d462658d6b937b08a0f0b6ef55a',
109             'refs/tags/mytag-packed': 'b0931cadc54336e78a1d980420e3268903b57a50',
110             }, r.get_refs())
111
112     def test_head(self):
113         r = self._repo = open_repo('a.git')
114         self.assertEqual(r.head(), 'a90fa2d900a17e99b433217e988c4eb4a2e9a097')
115
116     def test_get_object(self):
117         r = self._repo = open_repo('a.git')
118         obj = r.get_object(r.head())
119         self.assertEqual(obj.type_name, 'commit')
120
121     def test_get_object_non_existant(self):
122         r = self._repo = open_repo('a.git')
123         self.assertRaises(KeyError, r.get_object, missing_sha)
124
125     def test_contains_object(self):
126         r = self._repo = open_repo('a.git')
127         self.assertTrue(r.head() in r)
128
129     def test_contains_ref(self):
130         r = self._repo = open_repo('a.git')
131         self.assertTrue("HEAD" in r)
132
133     def test_contains_missing(self):
134         r = self._repo = open_repo('a.git')
135         self.assertFalse("bar" in r)
136
137     def test_commit(self):
138         r = self._repo = open_repo('a.git')
139         warnings.simplefilter("ignore", DeprecationWarning)
140         try:
141             obj = r.commit(r.head())
142         finally:
143             warnings.resetwarnings()
144         self.assertEqual(obj.type_name, 'commit')
145
146     def test_commit_not_commit(self):
147         r = self._repo = open_repo('a.git')
148         warnings.simplefilter("ignore", DeprecationWarning)
149         try:
150             self.assertRaises(errors.NotCommitError,
151                 r.commit, '4f2e6529203aa6d44b5af6e3292c837ceda003f9')
152         finally:
153             warnings.resetwarnings()
154
155     def test_tree(self):
156         r = self._repo = open_repo('a.git')
157         commit = r[r.head()]
158         warnings.simplefilter("ignore", DeprecationWarning)
159         try:
160             tree = r.tree(commit.tree)
161         finally:
162             warnings.resetwarnings()
163         self.assertEqual(tree.type_name, 'tree')
164         self.assertEqual(tree.sha().hexdigest(), commit.tree)
165
166     def test_tree_not_tree(self):
167         r = self._repo = open_repo('a.git')
168         warnings.simplefilter("ignore", DeprecationWarning)
169         try:
170             self.assertRaises(errors.NotTreeError, r.tree, r.head())
171         finally:
172             warnings.resetwarnings()
173
174     def test_tag(self):
175         r = self._repo = open_repo('a.git')
176         tag_sha = '28237f4dc30d0d462658d6b937b08a0f0b6ef55a'
177         warnings.simplefilter("ignore", DeprecationWarning)
178         try:
179             tag = r.tag(tag_sha)
180         finally:
181             warnings.resetwarnings()
182         self.assertEqual(tag.type_name, 'tag')
183         self.assertEqual(tag.sha().hexdigest(), tag_sha)
184         obj_class, obj_sha = tag.object
185         self.assertEqual(obj_class, objects.Commit)
186         self.assertEqual(obj_sha, r.head())
187
188     def test_tag_not_tag(self):
189         r = self._repo = open_repo('a.git')
190         warnings.simplefilter("ignore", DeprecationWarning)
191         try:
192             self.assertRaises(errors.NotTagError, r.tag, r.head())
193         finally:
194             warnings.resetwarnings()
195
196     def test_get_peeled(self):
197         # unpacked ref
198         r = self._repo = open_repo('a.git')
199         tag_sha = '28237f4dc30d0d462658d6b937b08a0f0b6ef55a'
200         self.assertNotEqual(r[tag_sha].sha().hexdigest(), r.head())
201         self.assertEqual(r.get_peeled('refs/tags/mytag'), r.head())
202
203         # packed ref with cached peeled value
204         packed_tag_sha = 'b0931cadc54336e78a1d980420e3268903b57a50'
205         parent_sha = r[r.head()].parents[0]
206         self.assertNotEqual(r[packed_tag_sha].sha().hexdigest(), parent_sha)
207         self.assertEqual(r.get_peeled('refs/tags/mytag-packed'), parent_sha)
208
209         # TODO: add more corner cases to test repo
210
211     def test_get_peeled_not_tag(self):
212         r = self._repo = open_repo('a.git')
213         self.assertEqual(r.get_peeled('HEAD'), r.head())
214
215     def test_get_blob(self):
216         r = self._repo = open_repo('a.git')
217         commit = r[r.head()]
218         tree = r[commit.tree]
219         blob_sha = tree.entries()[0][2]
220         warnings.simplefilter("ignore", DeprecationWarning)
221         try:
222             blob = r.get_blob(blob_sha)
223         finally:
224             warnings.resetwarnings()
225         self.assertEqual(blob.type_name, 'blob')
226         self.assertEqual(blob.sha().hexdigest(), blob_sha)
227
228     def test_get_blob_notblob(self):
229         r = self._repo = open_repo('a.git')
230         warnings.simplefilter("ignore", DeprecationWarning)
231         try:
232             self.assertRaises(errors.NotBlobError, r.get_blob, r.head())
233         finally:
234             warnings.resetwarnings()
235
236     def test_linear_history(self):
237         r = self._repo = open_repo('a.git')
238         history = r.revision_history(r.head())
239         shas = [c.sha().hexdigest() for c in history]
240         self.assertEqual(shas, [r.head(),
241                                 '2a72d929692c41d8554c07f6301757ba18a65d91'])
242
243     def test_merge_history(self):
244         r = self._repo = open_repo('simple_merge.git')
245         history = r.revision_history(r.head())
246         shas = [c.sha().hexdigest() for c in history]
247         self.assertEqual(shas, ['5dac377bdded4c9aeb8dff595f0faeebcc8498cc',
248                                 'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd',
249                                 '4cffe90e0a41ad3f5190079d7c8f036bde29cbe6',
250                                 '60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
251                                 '0d89f20333fbb1d2f3a94da77f4981373d8f4310'])
252
253     def test_revision_history_missing_commit(self):
254         r = self._repo = open_repo('simple_merge.git')
255         self.assertRaises(errors.MissingCommitError, r.revision_history,
256                           missing_sha)
257
258     def test_out_of_order_merge(self):
259         """Test that revision history is ordered by date, not parent order."""
260         r = self._repo = open_repo('ooo_merge.git')
261         history = r.revision_history(r.head())
262         shas = [c.sha().hexdigest() for c in history]
263         self.assertEqual(shas, ['7601d7f6231db6a57f7bbb79ee52e4d462fd44d1',
264                                 'f507291b64138b875c28e03469025b1ea20bc614',
265                                 'fb5b0425c7ce46959bec94d54b9a157645e114f5',
266                                 'f9e39b120c68182a4ba35349f832d0e4e61f485c'])
267
268     def test_get_tags_empty(self):
269         r = self._repo = open_repo('ooo_merge.git')
270         self.assertEqual({}, r.refs.as_dict('refs/tags'))
271
272     def test_get_config(self):
273         r = self._repo = open_repo('ooo_merge.git')
274         self.assertEquals({}, r.get_config())
275
276     def test_common_revisions(self):
277         """
278         This test demonstrates that ``find_common_revisions()`` actually returns
279         common heads, not revisions; dulwich already uses
280         ``find_common_revisions()`` in such a manner (see
281         ``Repo.fetch_objects()``).
282         """
283
284         expected_shas = set(['60dacdc733de308bb77bb76ce0fb0f9b44c9769e'])
285
286         # Source for objects.
287         r_base = open_repo('simple_merge.git')
288
289         # Re-create each-side of the merge in simple_merge.git.
290         #
291         # Since the trees and blobs are missing, the repository created is
292         # corrupted, but we're only checking for commits for the purpose of this
293         # test, so it's immaterial.
294         r1_dir = tempfile.mkdtemp()
295         r1_commits = ['ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd', # HEAD
296                       '60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
297                       '0d89f20333fbb1d2f3a94da77f4981373d8f4310']
298
299         r2_dir = tempfile.mkdtemp()
300         r2_commits = ['4cffe90e0a41ad3f5190079d7c8f036bde29cbe6', # HEAD
301                       '60dacdc733de308bb77bb76ce0fb0f9b44c9769e',
302                       '0d89f20333fbb1d2f3a94da77f4981373d8f4310']
303
304         try:
305             r1 = Repo.init_bare(r1_dir)
306             map(lambda c: r1.object_store.add_object(r_base.get_object(c)), \
307                 r1_commits)
308             r1.refs['HEAD'] = r1_commits[0]
309
310             r2 = Repo.init_bare(r2_dir)
311             map(lambda c: r2.object_store.add_object(r_base.get_object(c)), \
312                 r2_commits)
313             r2.refs['HEAD'] = r2_commits[0]
314
315             # Finally, the 'real' testing!
316             shas = r2.object_store.find_common_revisions(r1.get_graph_walker())
317             self.assertEqual(set(shas), expected_shas)
318
319             shas = r1.object_store.find_common_revisions(r2.get_graph_walker())
320             self.assertEqual(set(shas), expected_shas)
321         finally:
322             shutil.rmtree(r1_dir)
323             shutil.rmtree(r2_dir)
324
325
326 class BuildRepoTests(unittest.TestCase):
327     """Tests that build on-disk repos from scratch.
328
329     Repos live in a temp dir and are torn down after each test. They start with
330     a single commit in master having single file named 'a'.
331     """
332
333     def setUp(self):
334         repo_dir = os.path.join(tempfile.mkdtemp(), 'test')
335         os.makedirs(repo_dir)
336         r = self._repo = Repo.init(repo_dir)
337         self.assertFalse(r.bare)
338         self.assertEqual('ref: refs/heads/master', r.refs.read_ref('HEAD'))
339         self.assertRaises(KeyError, lambda: r.refs['refs/heads/master'])
340
341         f = open(os.path.join(r.path, 'a'), 'wb')
342         try:
343             f.write('file contents')
344         finally:
345             f.close()
346         r.stage(['a'])
347         commit_sha = r.do_commit('msg',
348                                  committer='Test Committer <test@nodomain.com>',
349                                  author='Test Author <test@nodomain.com>',
350                                  commit_timestamp=12345, commit_timezone=0,
351                                  author_timestamp=12345, author_timezone=0)
352         self.assertEqual([], r[commit_sha].parents)
353         self._root_commit = commit_sha
354
355     def tearDown(self):
356         tear_down_repo(self._repo)
357
358     def test_build_repo(self):
359         r = self._repo
360         self.assertEqual('ref: refs/heads/master', r.refs.read_ref('HEAD'))
361         self.assertEqual(self._root_commit, r.refs['refs/heads/master'])
362         expected_blob = objects.Blob.from_string('file contents')
363         self.assertEqual(expected_blob.data, r[expected_blob.id].data)
364         actual_commit = r[self._root_commit]
365         self.assertEqual('msg', actual_commit.message)
366
367     def test_commit_modified(self):
368         r = self._repo
369         f = open(os.path.join(r.path, 'a'), 'wb')
370         try:
371             f.write('new contents')
372         finally:
373             f.close()
374         r.stage(['a'])
375         commit_sha = r.do_commit('modified a',
376                                  committer='Test Committer <test@nodomain.com>',
377                                  author='Test Author <test@nodomain.com>',
378                                  commit_timestamp=12395, commit_timezone=0,
379                                  author_timestamp=12395, author_timezone=0)
380         self.assertEqual([self._root_commit], r[commit_sha].parents)
381         _, blob_id = tree_lookup_path(r.get_object, r[commit_sha].tree, 'a')
382         self.assertEqual('new contents', r[blob_id].data)
383
384     def test_commit_deleted(self):
385         r = self._repo
386         os.remove(os.path.join(r.path, 'a'))
387         r.stage(['a'])
388         commit_sha = r.do_commit('deleted a',
389                                  committer='Test Committer <test@nodomain.com>',
390                                  author='Test Author <test@nodomain.com>',
391                                  commit_timestamp=12395, commit_timezone=0,
392                                  author_timestamp=12395, author_timezone=0)
393         self.assertEqual([self._root_commit], r[commit_sha].parents)
394         self.assertEqual([], list(r.open_index()))
395         tree = r[r[commit_sha].tree]
396         self.assertEqual([], tree.iteritems())
397
398     def test_commit_fail_ref(self):
399         r = self._repo
400
401         def set_if_equals(name, old_ref, new_ref):
402             return False
403         r.refs.set_if_equals = set_if_equals
404
405         def add_if_new(name, new_ref):
406             self.fail('Unexpected call to add_if_new')
407         r.refs.add_if_new = add_if_new
408
409         old_shas = set(r.object_store)
410         self.assertRaises(errors.CommitError, r.do_commit, 'failed commit',
411                           committer='Test Committer <test@nodomain.com>',
412                           author='Test Author <test@nodomain.com>',
413                           commit_timestamp=12345, commit_timezone=0,
414                           author_timestamp=12345, author_timezone=0)
415         new_shas = set(r.object_store) - old_shas
416         self.assertEqual(1, len(new_shas))
417         # Check that the new commit (now garbage) was added.
418         new_commit = r[new_shas.pop()]
419         self.assertEqual(r[self._root_commit].tree, new_commit.tree)
420         self.assertEqual('failed commit', new_commit.message)
421
422     def test_stage_deleted(self):
423         r = self._repo
424         os.remove(os.path.join(r.path, 'a'))
425         r.stage(['a'])
426         r.stage(['a'])  # double-stage a deleted path
427
428
429 class CheckRefFormatTests(unittest.TestCase):
430     """Tests for the check_ref_format function.
431
432     These are the same tests as in the git test suite.
433     """
434
435     def test_valid(self):
436         self.assertTrue(check_ref_format('heads/foo'))
437         self.assertTrue(check_ref_format('foo/bar/baz'))
438         self.assertTrue(check_ref_format('refs///heads/foo'))
439         self.assertTrue(check_ref_format('foo./bar'))
440         self.assertTrue(check_ref_format('heads/foo@bar'))
441         self.assertTrue(check_ref_format('heads/fix.lock.error'))
442
443     def test_invalid(self):
444         self.assertFalse(check_ref_format('foo'))
445         self.assertFalse(check_ref_format('heads/foo/'))
446         self.assertFalse(check_ref_format('./foo'))
447         self.assertFalse(check_ref_format('.refs/foo'))
448         self.assertFalse(check_ref_format('heads/foo..bar'))
449         self.assertFalse(check_ref_format('heads/foo?bar'))
450         self.assertFalse(check_ref_format('heads/foo.lock'))
451         self.assertFalse(check_ref_format('heads/v@{ation'))
452         self.assertFalse(check_ref_format('heads/foo\bar'))
453
454
455 ONES = "1" * 40
456 TWOS = "2" * 40
457 THREES = "3" * 40
458 FOURS = "4" * 40
459
460 class PackedRefsFileTests(unittest.TestCase):
461
462     def test_split_ref_line_errors(self):
463         self.assertRaises(errors.PackedRefsException, _split_ref_line,
464                           'singlefield')
465         self.assertRaises(errors.PackedRefsException, _split_ref_line,
466                           'badsha name')
467         self.assertRaises(errors.PackedRefsException, _split_ref_line,
468                           '%s bad/../refname' % ONES)
469
470     def test_read_without_peeled(self):
471         f = StringIO('# comment\n%s ref/1\n%s ref/2' % (ONES, TWOS))
472         self.assertEqual([(ONES, 'ref/1'), (TWOS, 'ref/2')],
473                          list(read_packed_refs(f)))
474
475     def test_read_without_peeled_errors(self):
476         f = StringIO('%s ref/1\n^%s' % (ONES, TWOS))
477         self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
478
479     def test_read_with_peeled(self):
480         f = StringIO('%s ref/1\n%s ref/2\n^%s\n%s ref/4' % (
481           ONES, TWOS, THREES, FOURS))
482         self.assertEqual([
483           (ONES, 'ref/1', None),
484           (TWOS, 'ref/2', THREES),
485           (FOURS, 'ref/4', None),
486           ], list(read_packed_refs_with_peeled(f)))
487
488     def test_read_with_peeled_errors(self):
489         f = StringIO('^%s\n%s ref/1' % (TWOS, ONES))
490         self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
491
492         f = StringIO('%s ref/1\n^%s\n^%s' % (ONES, TWOS, THREES))
493         self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
494
495     def test_write_with_peeled(self):
496         f = StringIO()
497         write_packed_refs(f, {'ref/1': ONES, 'ref/2': TWOS},
498                           {'ref/1': THREES})
499         self.assertEqual(
500           "# pack-refs with: peeled\n%s ref/1\n^%s\n%s ref/2\n" % (
501           ONES, THREES, TWOS), f.getvalue())
502
503     def test_write_without_peeled(self):
504         f = StringIO()
505         write_packed_refs(f, {'ref/1': ONES, 'ref/2': TWOS})
506         self.assertEqual("%s ref/1\n%s ref/2\n" % (ONES, TWOS), f.getvalue())
507
508
509 # Dict of refs that we expect all RefsContainerTests subclasses to define.
510 _TEST_REFS = {
511   'HEAD': '42d06bd4b77fed026b154d16493e5deab78f02ec',
512   'refs/heads/master': '42d06bd4b77fed026b154d16493e5deab78f02ec',
513   'refs/heads/packed': '42d06bd4b77fed026b154d16493e5deab78f02ec',
514   'refs/tags/refs-0.1': 'df6800012397fb85c56e7418dd4eb9405dee075c',
515   'refs/tags/refs-0.2': '3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8',
516   }
517
518
519 class RefsContainerTests(object):
520
521     def test_keys(self):
522         actual_keys = set(self._refs.keys())
523         self.assertEqual(set(self._refs.allkeys()), actual_keys)
524         # ignore the symref loop if it exists
525         actual_keys.discard('refs/heads/loop')
526         self.assertEqual(set(_TEST_REFS.iterkeys()), actual_keys)
527
528         actual_keys = self._refs.keys('refs/heads')
529         actual_keys.discard('loop')
530         self.assertEqual(['master', 'packed'], sorted(actual_keys))
531         self.assertEqual(['refs-0.1', 'refs-0.2'],
532                          sorted(self._refs.keys('refs/tags')))
533
534     def test_as_dict(self):
535         # refs/heads/loop does not show up even if it exists
536         self.assertEqual(_TEST_REFS, self._refs.as_dict())
537
538     def test_setitem(self):
539         self._refs['refs/some/ref'] = '42d06bd4b77fed026b154d16493e5deab78f02ec'
540         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
541                          self._refs['refs/some/ref'])
542
543     def test_set_if_equals(self):
544         nines = '9' * 40
545         self.assertFalse(self._refs.set_if_equals('HEAD', 'c0ffee', nines))
546         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
547                          self._refs['HEAD'])
548
549         self.assertTrue(self._refs.set_if_equals(
550           'HEAD', '42d06bd4b77fed026b154d16493e5deab78f02ec', nines))
551         self.assertEqual(nines, self._refs['HEAD'])
552
553         self.assertTrue(self._refs.set_if_equals('refs/heads/master', None,
554                                                  nines))
555         self.assertEqual(nines, self._refs['refs/heads/master'])
556
557     def test_add_if_new(self):
558         nines = '9' * 40
559         self.assertFalse(self._refs.add_if_new('refs/heads/master', nines))
560         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
561                          self._refs['refs/heads/master'])
562
563         self.assertTrue(self._refs.add_if_new('refs/some/ref', nines))
564         self.assertEqual(nines, self._refs['refs/some/ref'])
565
566     def test_set_symbolic_ref(self):
567         self._refs.set_symbolic_ref('refs/heads/symbolic', 'refs/heads/master')
568         self.assertEqual('ref: refs/heads/master',
569                          self._refs.read_loose_ref('refs/heads/symbolic'))
570         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
571                          self._refs['refs/heads/symbolic'])
572
573     def test_set_symbolic_ref_overwrite(self):
574         nines = '9' * 40
575         self.assertFalse('refs/heads/symbolic' in self._refs)
576         self._refs['refs/heads/symbolic'] = nines
577         self.assertEqual(nines, self._refs.read_loose_ref('refs/heads/symbolic'))
578         self._refs.set_symbolic_ref('refs/heads/symbolic', 'refs/heads/master')
579         self.assertEqual('ref: refs/heads/master',
580                          self._refs.read_loose_ref('refs/heads/symbolic'))
581         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
582                          self._refs['refs/heads/symbolic'])
583
584     def test_check_refname(self):
585         try:
586             self._refs._check_refname('HEAD')
587         except KeyError:
588             self.fail()
589
590         try:
591             self._refs._check_refname('refs/heads/foo')
592         except KeyError:
593             self.fail()
594
595         self.assertRaises(KeyError, self._refs._check_refname, 'refs')
596         self.assertRaises(KeyError, self._refs._check_refname, 'notrefs/foo')
597
598     def test_contains(self):
599         self.assertTrue('refs/heads/master' in self._refs)
600         self.assertFalse('refs/heads/bar' in self._refs)
601
602     def test_delitem(self):
603         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
604                           self._refs['refs/heads/master'])
605         del self._refs['refs/heads/master']
606         self.assertRaises(KeyError, lambda: self._refs['refs/heads/master'])
607
608     def test_remove_if_equals(self):
609         self.assertFalse(self._refs.remove_if_equals('HEAD', 'c0ffee'))
610         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
611                          self._refs['HEAD'])
612         self.assertTrue(self._refs.remove_if_equals(
613           'refs/tags/refs-0.2', '3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8'))
614         self.assertFalse('refs/tags/refs-0.2' in self._refs)
615
616
617 class DictRefsContainerTests(RefsContainerTests, unittest.TestCase):
618
619     def setUp(self):
620         self._refs = DictRefsContainer(dict(_TEST_REFS))
621
622
623 class DiskRefsContainerTests(RefsContainerTests, unittest.TestCase):
624
625     def setUp(self):
626         self._repo = open_repo('refs.git')
627         self._refs = self._repo.refs
628
629     def tearDown(self):
630         tear_down_repo(self._repo)
631
632     def test_get_packed_refs(self):
633         self.assertEqual({
634           'refs/heads/packed': '42d06bd4b77fed026b154d16493e5deab78f02ec',
635           'refs/tags/refs-0.1': 'df6800012397fb85c56e7418dd4eb9405dee075c',
636           }, self._refs.get_packed_refs())
637
638     def test_get_peeled_not_packed(self):
639         # not packed
640         self.assertEqual(None, self._refs.get_peeled('refs/tags/refs-0.2'))
641         self.assertEqual('3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8',
642                          self._refs['refs/tags/refs-0.2'])
643
644         # packed, known not peelable
645         self.assertEqual(self._refs['refs/heads/packed'],
646                          self._refs.get_peeled('refs/heads/packed'))
647
648         # packed, peeled
649         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
650                          self._refs.get_peeled('refs/tags/refs-0.1'))
651
652     def test_setitem(self):
653         RefsContainerTests.test_setitem(self)
654         f = open(os.path.join(self._refs.path, 'refs', 'some', 'ref'), 'rb')
655         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
656                           f.read()[:40])
657         f.close()
658
659     def test_setitem_symbolic(self):
660         ones = '1' * 40
661         self._refs['HEAD'] = ones
662         self.assertEqual(ones, self._refs['HEAD'])
663
664         # ensure HEAD was not modified
665         f = open(os.path.join(self._refs.path, 'HEAD'), 'rb')
666         self.assertEqual('ref: refs/heads/master', iter(f).next().rstrip('\n'))
667         f.close()
668
669         # ensure the symbolic link was written through
670         f = open(os.path.join(self._refs.path, 'refs', 'heads', 'master'), 'rb')
671         self.assertEqual(ones, f.read()[:40])
672         f.close()
673
674     def test_set_if_equals(self):
675         RefsContainerTests.test_set_if_equals(self)
676
677         # ensure symref was followed
678         self.assertEqual('9' * 40, self._refs['refs/heads/master'])
679
680         # ensure lockfile was deleted
681         self.assertFalse(os.path.exists(
682           os.path.join(self._refs.path, 'refs', 'heads', 'master.lock')))
683         self.assertFalse(os.path.exists(
684           os.path.join(self._refs.path, 'HEAD.lock')))
685
686     def test_add_if_new_packed(self):
687         # don't overwrite packed ref
688         self.assertFalse(self._refs.add_if_new('refs/tags/refs-0.1', '9' * 40))
689         self.assertEqual('df6800012397fb85c56e7418dd4eb9405dee075c',
690                          self._refs['refs/tags/refs-0.1'])
691
692     def test_add_if_new_symbolic(self):
693         # Use an empty repo instead of the default.
694         tear_down_repo(self._repo)
695         repo_dir = os.path.join(tempfile.mkdtemp(), 'test')
696         os.makedirs(repo_dir)
697         self._repo = Repo.init(repo_dir)
698         refs = self._repo.refs
699
700         nines = '9' * 40
701         self.assertEqual('ref: refs/heads/master', refs.read_ref('HEAD'))
702         self.assertFalse('refs/heads/master' in refs)
703         self.assertTrue(refs.add_if_new('HEAD', nines))
704         self.assertEqual('ref: refs/heads/master', refs.read_ref('HEAD'))
705         self.assertEqual(nines, refs['HEAD'])
706         self.assertEqual(nines, refs['refs/heads/master'])
707         self.assertFalse(refs.add_if_new('HEAD', '1' * 40))
708         self.assertEqual(nines, refs['HEAD'])
709         self.assertEqual(nines, refs['refs/heads/master'])
710
711     def test_follow(self):
712         self.assertEquals(
713           ('refs/heads/master', '42d06bd4b77fed026b154d16493e5deab78f02ec'),
714           self._refs._follow('HEAD'))
715         self.assertEquals(
716           ('refs/heads/master', '42d06bd4b77fed026b154d16493e5deab78f02ec'),
717           self._refs._follow('refs/heads/master'))
718         self.assertRaises(KeyError, self._refs._follow, 'notrefs/foo')
719         self.assertRaises(KeyError, self._refs._follow, 'refs/heads/loop')
720
721     def test_delitem(self):
722         RefsContainerTests.test_delitem(self)
723         ref_file = os.path.join(self._refs.path, 'refs', 'heads', 'master')
724         self.assertFalse(os.path.exists(ref_file))
725         self.assertFalse('refs/heads/master' in self._refs.get_packed_refs())
726
727     def test_delitem_symbolic(self):
728         self.assertEqual('ref: refs/heads/master',
729                           self._refs.read_loose_ref('HEAD'))
730         del self._refs['HEAD']
731         self.assertRaises(KeyError, lambda: self._refs['HEAD'])
732         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
733                          self._refs['refs/heads/master'])
734         self.assertFalse(os.path.exists(os.path.join(self._refs.path, 'HEAD')))
735
736     def test_remove_if_equals_symref(self):
737         # HEAD is a symref, so shouldn't equal its dereferenced value
738         self.assertFalse(self._refs.remove_if_equals(
739           'HEAD', '42d06bd4b77fed026b154d16493e5deab78f02ec'))
740         self.assertTrue(self._refs.remove_if_equals(
741           'refs/heads/master', '42d06bd4b77fed026b154d16493e5deab78f02ec'))
742         self.assertRaises(KeyError, lambda: self._refs['refs/heads/master'])
743
744         # HEAD is now a broken symref
745         self.assertRaises(KeyError, lambda: self._refs['HEAD'])
746         self.assertEqual('ref: refs/heads/master',
747                           self._refs.read_loose_ref('HEAD'))
748
749         self.assertFalse(os.path.exists(
750             os.path.join(self._refs.path, 'refs', 'heads', 'master.lock')))
751         self.assertFalse(os.path.exists(
752             os.path.join(self._refs.path, 'HEAD.lock')))
753
754     def test_remove_packed_without_peeled(self):
755         refs_file = os.path.join(self._repo.path, 'packed-refs')
756         f = open(refs_file)
757         refs_data = f.read()
758         f.close()
759         f = open(refs_file, 'w')
760         f.write('\n'.join(l for l in refs_data.split('\n')
761                           if not l or l[0] not in '#^'))
762         f.close()
763         self._repo = Repo(self._repo.path)
764         refs = self._repo.refs
765         self.assertTrue(refs.remove_if_equals(
766           'refs/heads/packed', '42d06bd4b77fed026b154d16493e5deab78f02ec'))
767
768     def test_remove_if_equals_packed(self):
769         # test removing ref that is only packed
770         self.assertEqual('df6800012397fb85c56e7418dd4eb9405dee075c',
771                          self._refs['refs/tags/refs-0.1'])
772         self.assertTrue(
773           self._refs.remove_if_equals('refs/tags/refs-0.1',
774           'df6800012397fb85c56e7418dd4eb9405dee075c'))
775         self.assertRaises(KeyError, lambda: self._refs['refs/tags/refs-0.1'])
776
777     def test_read_ref(self):
778         self.assertEqual('ref: refs/heads/master', self._refs.read_ref("HEAD"))
779         self.assertEqual('42d06bd4b77fed026b154d16493e5deab78f02ec',
780             self._refs.read_ref("refs/heads/packed"))
781         self.assertEqual(None,
782             self._refs.read_ref("nonexistant"))