PARENT_ID = "parent"
AUTHOR_ID = "author"
COMMITTER_ID = "committer"
+OBJECT_ID = "object"
+TYPE_ID = "type"
+TAGGER_ID = "tagger"
def _decompress(string):
dcomp = zlib.decompressobj()
shafile._text = string
return shafile
+ def _parse_text(self):
+ """Grab the metadata attached to the tag"""
+ text = self._text
+ count = 0
+ assert text.startswith(OBJECT_ID), "Invalid tag object, " \
+ "must start with %s" % OBJECT_ID
+ count += len(OBJECT_ID)
+ assert text[count] == ' ', "Invalid tag object, " \
+ "%s must be followed by space not %s" % (OBJECT_ID, text[count])
+ count += 1
+ self._object_sha = text[count:count+40]
+ count += 40
+ assert text[count] == '\n', "Invalid tag object, " \
+ "%s sha must be followed by newline" % OBJECT_ID
+ count += 1
+ assert text[count:].startswith(TYPE_ID), "Invalid tag object, " \
+ "%s sha must be followed by %s" % (OBJECT_ID, TYPE_ID)
+ count += len(TYPE_ID)
+ assert text[count] == ' ', "Invalid tag object, " \
+ "%s must be followed by space not %s" % (TAG_ID, text[count])
+ count += 1
+ self._object_type = ""
+ while text[count] != '\n':
+ self._object_type += text[count]
+ count += 1
+ count += 1
+ assert self._object_type in (COMMIT_ID, BLOB_ID, TREE_ID, TAG_ID), "Invalid tag object, " \
+ "unexpected object type %s" % self._object_type
+ self._object_type = type_map[self._object_type]
+
+ assert text[count:].startswith(TAG_ID), "Invalid tag object, " \
+ "object type must be followed by %s" % (TAG_ID)
+ count += len(TAG_ID)
+ assert text[count] == ' ', "Invalid tag object, " \
+ "%s must be followed by space not %s" % (TAG_ID, text[count])
+ count += 1
+ self._name = ""
+ while text[count] != '\n':
+ self._name += text[count]
+ count += 1
+ count += 1
+
+ assert text[count:].startswith(TAGGER_ID), "Invalid tag object, " \
+ "%s must be followed by %s" % (TAG_ID, TAGGER_ID)
+ count += len(TAGGER_ID)
+ assert text[count] == ' ', "Invalid tag object, " \
+ "%s must be followed by space not %s" % (TAGGER_ID, text[count])
+ count += 1
+ self._tagger = ""
+ while text[count] != '>':
+ assert text[count] != '\n', "Malformed tagger information"
+ self._tagger += text[count]
+ count += 1
+ self._tagger += text[count]
+ count += 1
+ assert text[count] == ' ', "Invalid tag object, " \
+ "tagger information must be followed by space not %s" % text[count]
+ count += 1
+ self._tag_time = int(text[count:count+10])
+ while text[count] != '\n':
+ count += 1
+ count += 1
+ assert text[count] == '\n', "There must be a new line after the headers"
+ count += 1
+ self._message = text[count:]
+
+ @property
+ def object(self):
+ """Returns the object pointed by this tag, represented as a tuple(type, sha)"""
+ return (self._object_type, self._object_sha)
+
+ @property
+ def name(self):
+ """Returns the name of this tag"""
+ return self._name
+
+ @property
+ def tagger(self):
+ """Returns the name of the person who created this tag"""
+ return self._tagger
+
+ @property
+ def tag_time(self):
+ """Returns the creation timestamp of the tag.
+
+ Returns it as the number of seconds since the epoch"""
+ return self._tag_time
+
+ @property
+ def message(self):
+ """Returns the message attached to this tag"""
+ return self._message
+
class Tree(ShaFile):
"""A Git tree object"""
o = i1
for i in range(4):
- if o & 0x000000ff << i*8:
+ if o & 0xff << i*8:
scratch += chr(o >> i)
op |= 1 << i
s = i2 - i1
for i in range(2):
- if s & 0x000000ff << i*8:
+ if s & 0xff << i*8:
scratch += chr(s >> i)
op |= 1 << (4+i)
SYMREF = 'ref: '
-class Tag(object):
+class Tags(object):
- def __init__(self, name, ref):
- self.name = name
- self.ref = ref
+ def __init__(self, tagdir, tags):
+ self.tagdir = tagdir
+ self.tags = tags
+
+ def __getitem__(self, name):
+ return self.tags[name]
+
+ def __setitem__(self, name, ref):
+ self.tags[name] = ref
+ f = open(os.path.join(self.tagdir, name), 'wb')
+ try:
+ f.write("%s\n" % ref)
+ finally:
+ f.close()
+
+ def __len__(self):
+ return len(self.tags)
+
+ def iteritems(self):
+ for k in self.tags:
+ yield k, self[k]
class Repo(object):
else:
raise NotGitRepository(root)
self.path = root
- self.tags = [Tag(name, ref) for name, ref in self.get_tags().items()]
+ self.tags = Tags(self.tagdir(), self.get_tags())
self._object_store = None
def controldir(self):
os.remove(file)
return
+ def tagdir(self):
+ return os.path.join(self.controldir(), 'refs', 'tags')
+
def get_tags(self):
ret = {}
- for root, dirs, files in os.walk(os.path.join(self.controldir(), 'refs', 'tags')):
+ for root, dirs, files in os.walk(self.tagdir()):
for name in files:
ret[name] = self._get_ref(os.path.join(root, name))
return ret
--- /dev/null
+x\ 1m\8eMO\83@\18\84=ï¯x\8f\1a#Ý\85²\80Qã¶nIME\94¢ÁÛòÙíòQ
+mÊ¿\97ªG's\98L2O¦\17\ 5\18SzÑÄÛ,éÁ$1¥v\9e\9aqnaJb+¥¹0u3·m\e\e\19Mr\91
+a\12Ô\ f»\f\92¦ªd\8fú\91ÑÉ¢ÎÒs,²=°RB bYÂ\9d(¥Ö\9dãcQ Yjãæ\ 1\88n\10\1d\eØ!p\8d Æ\b7²\83Ñâ\8f\ 4# Ý\9c5ãîÒ\ 3ßõ!Xº\1e[\87ïü§G\1fÙ¾\93M}\vn}ð]8\12mª9pézád%ëÃé
+!É#Îf|ÎX´`ª\94íÉfóB½µÁKµ\1a\9cÃD%'\93Ís¡¾\1c¯öK¬÷¯CäÓ\9d\9a5\\85\ 4\99<a5\ e\17E£Dp\f\f\8bDÕdúéÈç-=n\8c\rºoKkø=ʽ§ÿn~\ 3þ6iM
\ No newline at end of file
from dulwich.objects import (Blob,
Tree,
Commit,
+ Tag
)
a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
b_sha = '2969be3e8ee1c0222396a5611407e4769f14e54b'
c_sha = '954a536f7819d40e6f637f849ee187dd10066349'
tree_sha = '70c190eb48fa8bbb50ddc692a17b44cb781af7f6'
+tag_sha = '71033db03a03c6a36721efcf1968dd8f8e0cf023'
class BlobReadTests(unittest.TestCase):
"""Test decompression of blobs"""
def get_tree(self, sha):
return self.get_sha_file(Tree, 'trees', sha)
+ def get_tag(self, sha):
+ return self.get_sha_file(Tag, 'tags', sha)
+
def commit(self, sha):
return self.get_sha_file(Commit, 'commits', sha)
self.assertEqual(t.entries()[0], (33188, 'a', a_sha))
self.assertEqual(t.entries()[1], (33188, 'b', b_sha))
+ def test_read_tag_from_file(self):
+ t = self.get_tag(tag_sha)
+ self.assertEqual(t.object, (Commit, '51b668fd5bf7061b7d6fa525f88803e6cfadaa51'))
+ self.assertEqual(t.name,'signed')
+ self.assertEqual(t.tagger,'Ali Sabil <ali.sabil@gmail.com>')
+ self.assertEqual(t.tag_time, 1231203091)
+ self.assertEqual(t.message, 'This is a signed tag\n-----BEGIN PGP SIGNATURE-----\nVersion: GnuPG v1.4.9 (GNU/Linux)\n\niEYEABECAAYFAkliqx8ACgkQqSMmLy9u/kcx5ACfakZ9NnPl02tOyYP6pkBoEkU1\n5EcAn0UFgokaSvS371Ym/4W9iJj6vh3h\n=ql7y\n-----END PGP SIGNATURE-----\n')
+
+
def test_read_commit_from_file(self):
sha = '60dacdc733de308bb77bb76ce0fb0f9b44c9769e'
c = self.commit(sha)
'fb5b0425c7ce46959bec94d54b9a157645e114f5',
'f9e39b120c68182a4ba35349f832d0e4e61f485c'])
+ def test_get_tags_empty(self):
+ r = self.open_repo('ooo_merge')
+ self.assertEquals({}, r.get_tags())