+ """An index in to a packfile.
+
+ Given a sha id of an object a pack index can tell you the location in the
+ packfile of that object if it has it.
+
+ To do the loop it opens the file, and indexes first 256 4 byte groups
+ with the first byte of the sha id. The value in the four byte group indexed
+ is the end of the group that shares the same starting byte. Subtract one
+ from the starting byte and index again to find the start of the group.
+ The values are sorted by sha id within the group, so do the math to find
+ the start and end offset and then bisect in to find if the value is present.
+ """
+
+ def __init__(self, filename, file=None):
+ """Create a pack index object.
+
+ Provide it with the name of the index file to consider, and it will map
+ it whenever required.
+ """
+ self._filename = filename
+ # Take the size now, so it can be checked each time we map the file to
+ # ensure that it hasn't changed.
+ self._size = os.path.getsize(filename)
+ if file is None:
+ self._file = open(filename, 'r')
+ else:
+ self._file = file
+ self._contents, map_offset = simple_mmap(self._file, 0, self._size)
+ assert map_offset == 0
+
+ def __eq__(self, other):
+ if not isinstance(other, PackIndex):
+ return False
+
+ if self._fan_out_table != other._fan_out_table:
+ return False
+
+ for (name1, _, _), (name2, _, _) in izip(self.iterentries(), other.iterentries()):
+ if name1 != name2:
+ return False
+ return True
+
+ def close(self):
+ self._file.close()
+
+ def __len__(self):
+ """Return the number of entries in this pack index."""
+ return self._fan_out_table[-1]
+
+ def _unpack_entry(self, i):
+ """Unpack the i-th entry in the index file.
+
+ :return: Tuple with object name (SHA), offset in pack file and
+ CRC32 checksum (if known)."""
+ raise NotImplementedError(self._unpack_entry)
+
+ def _unpack_name(self, i):
+ """Unpack the i-th name from the index file."""
+ raise NotImplementedError(self._unpack_name)
+
+ def _unpack_offset(self, i):
+ """Unpack the i-th object offset from the index file."""
+ raise NotImplementedError(self._unpack_offset)
+
+ def _unpack_crc32_checksum(self, i):
+ """Unpack the crc32 checksum for the i-th object from the index file."""
+ raise NotImplementedError(self._unpack_crc32_checksum)
+
+ def __iter__(self):
+ return imap(sha_to_hex, self._itersha())
+
+ def _itersha(self):
+ for i in range(len(self)):
+ yield self._unpack_name(i)
+
+ def objects_sha1(self):
+ """Return the hex SHA1 over all the shas of all objects in this pack.
+
+ :note: This is used for the filename of the pack.
+ """
+ return iter_sha1(self._itersha())
+
+ def iterentries(self):
+ """Iterate over the entries in this pack index.
+
+ Will yield tuples with object name, offset in packfile and crc32 checksum.
+ """
+ for i in range(len(self)):
+ yield self._unpack_entry(i)
+
+ def _read_fan_out_table(self, start_offset):
+ ret = []
+ for i in range(0x100):
+ ret.append(struct.unpack(">L", self._contents[start_offset+i*4:start_offset+(i+1)*4])[0])
+ return ret
+
+ def check(self):
+ """Check that the stored checksum matches the actual checksum."""
+ return self.calculate_checksum() == self.get_stored_checksum()
+
+ def calculate_checksum(self):
+ return make_sha(self._contents[:-20]).digest()
+
+ def get_pack_checksum(self):
+ """Return the SHA1 checksum stored for the corresponding packfile."""
+ return str(self._contents[-40:-20])
+
+ def get_stored_checksum(self):
+ """Return the SHA1 checksum stored for this index."""
+ return str(self._contents[-20:])
+
+ def object_index(self, sha):
+ """Return the index in to the corresponding packfile for the object.
+
+ Given the name of an object it will return the offset that object lives
+ at within the corresponding pack file. If the pack file doesn't have the
+ object then None will be returned.
+ """
+ if len(sha) == 40:
+ sha = hex_to_sha(sha)
+ return self._object_index(sha)
+
+ def _object_index(self, sha):
+ """See object_index.
+
+ :param sha: A *binary* SHA string. (20 characters long)_
+ """
+ assert len(sha) == 20
+ idx = ord(sha[0])
+ if idx == 0:
+ start = 0
+ else:
+ start = self._fan_out_table[idx-1]
+ end = self._fan_out_table[idx]
+ i = bisect_find_sha(start, end, sha, self._unpack_name)
+ if i is None:
+ raise KeyError(sha)
+ return self._unpack_offset(i)
+