def iter_sha1(iter):
+ """Return the hexdigest of the SHA1 over a set of names."""
sha1 = make_sha()
for name in iter:
sha1.update(name)
return mem, offset
+def load_pack_index(filename):
+ f = open(filename, 'r')
+ if f.read(4) == '\377tOc':
+ version = struct.unpack(">L", f.read(4))[0]
+ if version == 2:
+ f.seek(0)
+ return PackIndex2(filename, file=f)
+ else:
+ raise KeyError("Unknown pack index format %d" % version)
+ else:
+ f.seek(0)
+ return PackIndex1(filename, file=f)
+
+
class PackIndex(object):
"""An index in to a packfile.
the start and end offset and then bisect in to find if the value is present.
"""
- def __init__(self, filename):
+ def __init__(self, filename, file=None):
"""Create a pack index object.
Provide it with the name of the index file to consider, and it will map
# Take the size now, so it can be checked each time we map the file to
# ensure that it hasn't changed.
self._size = os.path.getsize(filename)
- self._file = open(filename, 'r')
+ if file is None:
+ self._file = open(filename, 'r')
+ else:
+ self._file = file
self._contents, map_offset = simple_mmap(self._file, 0, self._size)
assert map_offset == 0
- if self._contents[:4] != '\377tOc':
- self.version = 1
- self._fan_out_table = self._read_fan_out_table(0)
- else:
- (self.version, ) = unpack_from(">L", self._contents, 4)
- assert self.version in (2,), "Version was %d" % self.version
- self._fan_out_table = self._read_fan_out_table(8)
- self._name_table_offset = 8 + 0x100 * 4
- self._crc32_table_offset = self._name_table_offset + 20 * len(self)
- self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self)
def __eq__(self, other):
- if type(self) != type(other):
+ if not isinstance(other, PackIndex):
return False
if self._fan_out_table != other._fan_out_table:
:return: Tuple with object name (SHA), offset in pack file and
CRC32 checksum (if known)."""
- if self.version == 1:
- (offset, name) = unpack_from(">L20s", self._contents,
- (0x100 * 4) + (i * 24))
- return (name, offset, None)
- else:
- return (self._unpack_name(i), self._unpack_offset(i),
- self._unpack_crc32_checksum(i))
+ raise NotImplementedError(self._unpack_entry)
def _unpack_name(self, i):
- if self.version == 1:
- offset = (0x100 * 4) + (i * 24) + 4
- else:
- offset = self._name_table_offset + i * 20
- return self._contents[offset:offset+20]
+ """Unpack the i-th name from the index file."""
+ raise NotImplementedError(self._unpack_name)
def _unpack_offset(self, i):
- if self.version == 1:
- offset = (0x100 * 4) + (i * 24)
- else:
- offset = self._pack_offset_table_offset + i * 4
- return unpack_from(">L", self._contents, offset)[0]
-
+ """Unpack the i-th object offset from the index file."""
+ raise NotImplementedError(self._unpack_offset)
+
def _unpack_crc32_checksum(self, i):
- if self.version == 1:
- return None
- else:
- return unpack_from(">L", self._contents,
- self._crc32_table_offset + i * 4)[0]
+ """Unpack the crc32 checksum for the i-th object from the index file."""
+ raise NotImplementedError(self._unpack_crc32_checksum)
def __iter__(self):
return imap(sha_to_hex, self._itersha())
yield self._unpack_name(i)
def objects_sha1(self):
+ """Return the hex SHA1 over all the shas of all objects in this pack.
+
+ :note: This is used for the filename of the pack.
+ """
return iter_sha1(self._itersha())
def iterentries(self):
return None
+class PackIndex1(PackIndex):
+ """Version 1 Pack Index."""
+
+ def __init__(self, filename, file=None):
+ PackIndex.__init__(self, filename, file)
+ self.version = 1
+ self._fan_out_table = self._read_fan_out_table(0)
+
+ def _unpack_entry(self, i):
+ (offset, name) = unpack_from(">L20s", self._contents,
+ (0x100 * 4) + (i * 24))
+ return (name, offset, None)
+
+ def _unpack_name(self, i):
+ offset = (0x100 * 4) + (i * 24) + 4
+ return self._contents[offset:offset+20]
+
+ def _unpack_offset(self, i):
+ offset = (0x100 * 4) + (i * 24)
+ return unpack_from(">L", self._contents, offset)[0]
+
+ def _unpack_crc32_checksum(self, i):
+ # Not stored in v1 index files
+ return None
+
+
+class PackIndex2(PackIndex):
+ """Version 2 Pack Index."""
+
+ def __init__(self, filename, file=None):
+ PackIndex.__init__(self, filename, file)
+ assert self._contents[:4] == '\377tOc', "Not a v2 pack index file"
+ (self.version, ) = unpack_from(">L", self._contents, 4)
+ assert self.version == 2, "Version was %d" % self.version
+ self._fan_out_table = self._read_fan_out_table(8)
+ self._name_table_offset = 8 + 0x100 * 4
+ self._crc32_table_offset = self._name_table_offset + 20 * len(self)
+ self._pack_offset_table_offset = self._crc32_table_offset + 4 * len(self)
+
+ def _unpack_entry(self, i):
+ return (self._unpack_name(i), self._unpack_offset(i),
+ self._unpack_crc32_checksum(i))
+
+ def _unpack_name(self, i):
+ offset = self._name_table_offset + i * 20
+ return self._contents[offset:offset+20]
+
+ def _unpack_offset(self, i):
+ offset = self._pack_offset_table_offset + i * 4
+ return unpack_from(">L", self._contents, offset)[0]
+
+ def _unpack_crc32_checksum(self, i):
+ return unpack_from(">L", self._contents,
+ self._crc32_table_offset + i * 4)[0]
+
+
+
def read_pack_header(f):
header = f.read(12)
assert header[:4] == "PACK"
@property
def idx(self):
if self._idx is None:
- self._idx = PackIndex(self._idx_path)
+ self._idx = load_pack_index(self._idx_path)
return self._idx
def close(self):
)
from dulwich.pack import (
Pack,
- PackIndex,
PackData,
apply_delta,
create_delta,
+ load_pack_index,
hex_to_sha,
read_zlib,
sha_to_hex,
def get_pack_index(self, sha):
"""Returns a PackIndex from the datadir with the given sha"""
- return PackIndex(os.path.join(self.datadir, 'pack-%s.idx' % sha))
+ return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
def get_pack_data(self, sha):
"""Returns a PackData object from the datadir with the given sha"""
def test_create_index_v1(self):
p = self.get_pack_data(pack1_sha)
p.create_index_v1("v1test.idx")
- idx1 = PackIndex("v1test.idx")
+ idx1 = load_pack_index("v1test.idx")
idx2 = self.get_pack_index(pack1_sha)
self.assertEquals(idx1, idx2)
def test_create_index_v2(self):
p = self.get_pack_data(pack1_sha)
p.create_index_v2("v2test.idx")
- idx1 = PackIndex("v2test.idx")
+ idx1 = load_pack_index("v2test.idx")
idx2 = self.get_pack_index(pack1_sha)
self.assertEquals(idx1, idx2)
def test_empty(self):
pack_checksum = 'r\x19\x80\xe8f\xaf\x9a_\x93\xadgAD\xe1E\x9b\x8b\xa3\xe7\xb7'
self._write_fn("empty.idx", [], pack_checksum)
- idx = PackIndex("empty.idx")
+ idx = load_pack_index("empty.idx")
self.assertTrue(idx.check())
self.assertEquals(idx.get_pack_checksum(), pack_checksum)
self.assertEquals(0, len(idx))
my_entries = [('og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 178, 42)]
my_entries.sort()
self._write_fn("single.idx", my_entries, pack_checksum)
- idx = PackIndex("single.idx")
+ idx = load_pack_index("single.idx")
self.assertEquals(idx.version, self._expected_version)
self.assertTrue(idx.check())
self.assertEquals(idx.get_pack_checksum(), pack_checksum)