$(PEP8) dulwich
style:
- $(FLAKE8) --exclude=build,.git
+ $(FLAKE8) --exclude=build,.git,build-pypy
before-push: check
git diff origin/master | $(PEP8) --diff
source_suffix = '.txt'
# The encoding of source files.
-#source_encoding = 'utf-8'
+# source_encoding = 'utf-8'
# The master toctree document.
master_doc = 'index'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
-#language = None
+# language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
-#today = ''
+# today = ''
# Else, today_fmt is used as the format for a strftime call.
-#today_fmt = '%B %d, %Y'
+# today_fmt = '%B %d, %Y'
# List of documents that shouldn't be included in the build.
-#unused_docs = []
+# unused_docs = []
# List of directories, relative to source directory, that shouldn't be searched
# for source files.
exclude_trees = ['build']
-# The reST default role (used for this markup: `text`) to use for all documents.
-#default_role = None
+# The reST default role (used for this markup: `text`) to use for all
+# documents.
+# default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
-#add_function_parentheses = True
+# add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
-#add_module_names = True
+# add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
-#show_authors = False
+# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
-#modindex_common_prefix = []
+# modindex_common_prefix = []
-# -- Options for HTML output ---------------------------------------------------
+# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
-#html_theme = 'default'
+# html_theme = 'default'
html_theme = 'nature'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
-#html_theme_options = {}
+# html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
html_theme_path = ['theme']
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
-#html_title = None
+# html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
-#html_short_title = None
+# html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
-#html_logo = None
+# html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
-#html_favicon = None
+# html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
-#html_last_updated_fmt = '%b %d, %Y'
+# html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
-#html_use_smartypants = True
+# html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
-#html_sidebars = {}
+# html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
-#html_additional_pages = {}
+# html_additional_pages = {}
# If false, no module index is generated.
-#html_use_modindex = True
+# html_use_modindex = True
# If false, no index is generated.
-#html_use_index = True
+# html_use_index = True
# If true, the index is split into individual pages for each letter.
-#html_split_index = False
+# html_split_index = False
# If true, links to the reST sources are added to the pages.
-#html_show_sourcelink = True
+# html_show_sourcelink = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
-#html_use_opensearch = ''
+# html_use_opensearch = ''
# If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml").
-#html_file_suffix = ''
+# html_file_suffix = ''
# Output file base name for HTML help builder.
htmlhelp_basename = 'dulwichdoc'
-# -- Options for LaTeX output --------------------------------------------------
+# -- Options for LaTeX output ------------------------------------------------
# The paper size ('letter' or 'a4').
-#latex_paper_size = 'letter'
+# latex_paper_size = 'letter'
# The font size ('10pt', '11pt' or '12pt').
-#latex_font_size = '10pt'
+# latex_font_size = '10pt'
# Grouping the document tree into LaTeX files. List of tuples
-# (source start file, target name, title, author, documentclass [howto/manual]).
+# (source start file, target name, title, author, documentclass
+# [howto/manual]).
latex_documents = [
('index', 'dulwich.tex', u'dulwich Documentation',
u'Jelmer Vernooij', 'manual'),
# The name of an image file (relative to this directory) to place at the top of
# the title page.
-#latex_logo = None
+# latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
-#latex_use_parts = False
+# latex_use_parts = False
# Additional stuff for the LaTeX preamble.
-#latex_preamble = ''
+# latex_preamble = ''
# Documents to append as an appendix to all manuals.
-#latex_appendices = []
+# latex_appendices = []
# If false, no module index is generated.
-#latex_use_modindex = True
+# latex_use_modindex = True
pdf_documents = [
('index', u'dulwich', u'Documentation for dulwich',
u'Jelmer Vernooij'),
]
-pdf_stylesheets = ['sphinx','kerning','a4']
+pdf_stylesheets = ['sphinx', 'kerning', 'a4']
pdf_break_level = 2
pdf_inline_footnotes = True
-
"""Turn a list of bytestrings into a file-like object.
This is similar to creating a `BytesIO` from a concatenation of the
- bytestring list, but saves memory by NOT creating one giant bytestring first::
+ bytestring list, but saves memory by NOT creating one giant bytestring
+ first::
- BytesIO(b''.join(list_of_bytestrings)) =~= ChunkedBytesIO(list_of_bytestrings)
+ BytesIO(b''.join(list_of_bytestrings)) =~= ChunkedBytesIO(
+ list_of_bytestrings)
"""
def __init__(self, contents):
self.contents = contents
try:
blob = store[entry.sha]
except KeyError:
- # Entry probably refers to a submodule, which we don't yet support.
+ # Entry probably refers to a submodule, which we don't yet
+ # support.
continue
data = ChunkedBytesIO(blob.chunked)
info = tarfile.TarInfo()
- info.name = entry_abspath.decode('ascii') # tarfile only works with ascii.
+ # tarfile only works with ascii.
+ info.name = entry_abspath.decode('ascii')
info.size = blob.raw_length()
info.mode = entry.mode
info.mtime = mtime
import paramiko.client
import threading
+
class _ParamikoWrapper(object):
STDERR_READ_N = 2048 # 2k
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
headers=token_header)
- self.base_path = str(
- posixpath.join(urlparse.urlparse(self.storage_url).path, self.root))
+ self.base_path = str(posixpath.join(
+ urlparse.urlparse(self.storage_url).path, self.root))
def swift_auth_v1(self):
self.user = self.user.replace(";", ":")
entries.sort()
pack_base_name = posixpath.join(
self.pack_dir,
- 'pack-' + iter_sha1(e[0] for e in entries).decode(sys.getfilesystemencoding()))
+ 'pack-' + iter_sha1(e[0] for e in entries).decode(
+ sys.getfilesystemencoding()))
self.scon.put_object(pack_base_name + '.pack', f)
# Write the index.
try:
import gevent
- import geventhttpclient
+ import geventhttpclient # noqa: F401
except ImportError:
print("gevent and geventhttpclient libraries are mandatory "
" for use the Swift backend.")
}
if len(sys.argv) < 2:
- print("Usage: %s <%s> [OPTIONS...]" % (sys.argv[0], "|".join(commands.keys())))
+ print("Usage: %s <%s> [OPTIONS...]" % (
+ sys.argv[0], "|".join(commands.keys())))
sys.exit(1)
cmd = sys.argv[1]
- if not cmd in commands:
+ if cmd not in commands:
print("No such subcommand: %s" % cmd)
sys.exit(1)
commands[cmd](sys.argv[2:])
+
if __name__ == '__main__':
main()
missing_libs = []
try:
- import gevent
+ import gevent # noqa:F401
except ImportError:
missing_libs.append("gevent")
try:
- import geventhttpclient
+ import geventhttpclient # noqa:F401
except ImportError:
missing_libs.append("geventhttpclient")
skipmsg = "Required libraries are not installed (%r)" % missing_libs
-skipIfPY3 = skipIf(sys.version_info[0] == 3, "SWIFT module not yet ported to python3.")
+skipIfPY3 = skipIf(sys.version_info[0] == 3,
+ "SWIFT module not yet ported to python3.")
if not missing_libs:
from dulwich.contrib import swift
data.extend([blob, tree, tag, cmt])
return data
+
@skipIf(missing_libs, skipmsg)
class FakeSwiftConnector(object):
def get_object_stat(self, name):
name = posixpath.join(self.root, name)
- if not name in self.store:
+ if name not in self.store:
return None
return {'content-length': len(self.store[name])}
head = odata[-1].id
peeled_sha = dict([(sha.object[1], sha.id)
for sha in odata if isinstance(sha, Tag)])
- get_tagged = lambda: peeled_sha
+
+ def get_tagged():
+ return peeled_sha
i = sos.iter_shas(sos.find_missing_objects([],
[head, ],
progress=None,
def setUp(self):
super(TestSwiftInfoRefsContainer, self).setUp()
- content = \
- b"22effb216e3a82f97da599b8885a6cadb488b4c5\trefs/heads/master\n" + \
- b"cca703b0e1399008b53a1a236d6b4584737649e4\trefs/heads/dev"
+ content = (
+ b"22effb216e3a82f97da599b8885a6cadb488b4c5\trefs/heads/master\n"
+ b"cca703b0e1399008b53a1a236d6b4584737649e4\trefs/heads/dev")
self.store = {'fakerepo/info/refs': content}
self.conf = swift.load_conf(file=StringIO(config_file %
def_config_file))
def test_create_root(self):
with patch('dulwich.contrib.swift.SwiftConnector.test_root_exists',
- lambda *args: None):
+ lambda *args: None):
with patch('geventhttpclient.HTTPClient.request',
- lambda *args: Response()):
+ lambda *args: Response()):
self.assertEqual(self.conn.create_root(), None)
def test_create_root_fails(self):
self.assertEqual(self.conn.get_object('a').read(), b'content')
with patch('geventhttpclient.HTTPClient.request',
lambda *args, **kwargs: Response(content=b'content')):
- self.assertEqual(self.conn.get_object('a', range='0-6'), b'content')
+ self.assertEqual(
+ self.conn.get_object('a', range='0-6'),
+ b'content')
def test_get_object_fails(self):
with patch('geventhttpclient.HTTPClient.request',
from gevent import monkey
monkey.patch_all()
-from dulwich import server
-from dulwich import repo
-from dulwich import index
-from dulwich import client
-from dulwich import objects
-from dulwich.contrib import swift
+from dulwich ( # noqa:E402
+ server,
+ repo,
+ index,
+ client,
+ objects,
+ )
+from dulwich.contrib import swift # noqa:E402
class DulwichServer():
files = ('testfile', 'testfile2', 'dir/testfile3')
i = 0
for f in files:
- file(os.path.join(self.temp_d, f), 'w').write("DATA %s" % i)
+ open(os.path.join(self.temp_d, f), 'w').write("DATA %s" % i)
i += 1
local_repo.stage(files)
local_repo.do_commit('Test commit', 'fbo@localhost',
files = ('testfile11', 'testfile22', 'test/testfile33')
i = 0
for f in files:
- file(os.path.join(self.temp_d, f), 'w').write("DATA %s" % i)
+ open(os.path.join(self.temp_d, f), 'w').write("DATA %s" % i)
i += 1
local_repo.stage(files)
local_repo.do_commit('Test commit', 'fbo@localhost',
self.got = got
self.extra = extra
if self.extra is None:
- Exception.__init__(self,
- "Checksum mismatch: Expected %s, got %s" % (expected, got))
+ Exception.__init__(
+ self, "Checksum mismatch: Expected %s, got %s" %
+ (expected, got))
else:
- Exception.__init__(self,
- "Checksum mismatch: Expected %s, got %s; %s" %
+ Exception.__init__(
+ self, "Checksum mismatch: Expected %s, got %s; %s" %
(expected, got, extra))
"""Hangup exception."""
def __init__(self):
- Exception.__init__(self,
- "The remote server unexpectedly closed the connection.")
+ Exception.__init__(
+ self, "The remote server unexpectedly closed the connection.")
class UnexpectedCommandError(GitProtocolError):
Tag,
)
from fastimport import __version__ as fastimport_version
-if fastimport_version <= (0, 9, 5) and sys.version_info[0] == 3 and sys.version_info[1] < 5:
+if (fastimport_version <= (0, 9, 5) and
+ sys.version_info[0] == 3 and sys.version_info[1] < 5):
raise ImportError("Older versions of fastimport don't support python3<3.5")
-from fastimport import (
+from fastimport import ( # noqa: E402
commands,
errors as fastimport_errors,
parser,
processor,
)
-import stat
+import stat # noqa: E402
def split_email(text):
self.outf.write(getattr(cmd, "__bytes__", cmd.__repr__)() + b"\n")
def _allocate_marker(self):
- self._marker_idx+=1
+ self._marker_idx += 1
return ("%d" % (self._marker_idx,)).encode('ascii')
def _export_blob(self, blob):
def _iter_files(self, base_tree, new_tree):
for ((old_path, new_path), (old_mode, new_mode),
- (old_hexsha, new_hexsha)) in \
+ (old_hexsha, new_hexsha)) in \
self.store.tree_changes(base_tree, new_tree):
if new_path is None:
yield commands.FileDeleteCommand(old_path)
merges = []
author, author_email = split_email(commit.author)
committer, committer_email = split_email(commit.committer)
- cmd = commands.CommitCommand(ref, marker,
+ cmd = commands.CommitCommand(
+ ref, marker,
(author, author_email, commit.author_time, commit.author_timezone),
(committer, committer_email, commit.commit_time,
commit.commit_timezone),
blob_id = blob.id
else:
assert filecmd.dataref.startswith(b":"), \
- "non-marker refs not supported yet (%r)" % filecmd.dataref
+ ("non-marker refs not supported yet (%r)" %
+ filecmd.dataref)
blob_id = self.markers[filecmd.dataref[1:]]
self._contents[filecmd.path] = (filecmd.mode, blob_id)
elif filecmd.name == b"filedelete":
self._contents = {}
else:
raise Exception("Command %s not supported" % filecmd.name)
- commit.tree = commit_tree(self.repo.object_store,
+ commit.tree = commit_tree(
+ self.repo.object_store,
((path, hexsha, mode) for (path, (mode, hexsha)) in
self._contents.items()))
if self.last_commit is not None:
import sys
import tempfile
+
def ensure_dir_exists(dirname):
"""Ensure a directory exists, creating if necessary."""
try:
(name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) = entry
write_cache_time(f, ctime)
write_cache_time(f, mtime)
- flags = len(name) | (flags &~ 0x0fff)
- f.write(struct.pack(b'>LLLLLL20sH', dev & 0xFFFFFFFF, ino & 0xFFFFFFFF, mode, uid, gid, size, hex_to_sha(sha), flags))
+ flags = len(name) | (flags & ~0x0fff)
+ f.write(struct.pack(
+ b'>LLLLLL20sH', dev & 0xFFFFFFFF, ino & 0xFFFFFFFF,
+ mode, uid, gid, size, hex_to_sha(sha), flags))
f.write(name)
real_size = ((f.tell() - beginoffset + 8) & ~7)
f.write(b'\0' * ((beginoffset + real_size) - f.tell()))
def __getitem__(self, name):
"""Retrieve entry by relative path.
- :return: tuple with (ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags)
+ :return: tuple with (ctime, mtime, dev, ino, mode, uid, gid, size, sha,
+ flags)
"""
return self._byname[name]
:param object_store: Object store to use for retrieving tree contents
:param tree: SHA1 of the root tree
:param want_unchanged: Whether unchanged files should be reported
- :return: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), (oldsha, newsha)
+ :return: Iterator over tuples with (oldpath, newpath), (oldmode,
+ newmode), (oldsha, newsha)
"""
def lookup_entry(path):
entry = self[path]
return entry.sha, entry.mode
- for (name, mode, sha) in changes_from_tree(self._byname.keys(),
- lookup_entry, object_store, tree,
+ for (name, mode, sha) in changes_from_tree(
+ self._byname.keys(), lookup_entry, object_store, tree,
want_unchanged=want_unchanged):
yield (name, mode, sha)
def changes_from_tree(names, lookup_entry, object_store, tree,
- want_unchanged=False):
+ want_unchanged=False):
"""Find the differences between the contents of a tree and
a working copy.
:param object_store: Non-empty object store holding tree contents
:param honor_filemode: An optional flag to honor core.filemode setting in
config file, default is core.filemode=True, change executable bit
- :param validate_path_element: Function to validate path elements to check out;
- default just refuses .git and .. directories.
+ :param validate_path_element: Function to validate path elements to check
+ out; default just refuses .git and .. directories.
:note:: existing index is wiped and contents are not merged
in a working dir. Suitable only for fresh clones.
self._pack_cache_time = os.stat(self.pack_dir).st_mtime
pack_files = set()
for name in pack_dir_contents:
- assert isinstance(name, basestring if sys.version_info[0] == 2 else str)
if name.startswith("pack-") and name.endswith(".pack"):
# verify that idx exists first (otherwise the pack was not yet
# fully written)
def __cmp__(self, other):
if not isinstance(other, ShaFile):
raise TypeError
- return cmp(self.id, other.id)
+ return cmp(self.id, other.id) # noqa: F821
class Blob(ShaFile):
chunks.append(git_line(
_TAGGER_HEADER, self._tagger,
str(self._tag_time).encode('ascii'),
- format_timezone(self._tag_timezone, self._tag_timezone_neg_utc)))
+ format_timezone(
+ self._tag_timezone, self._tag_timezone_neg_utc)))
if self._message is not None:
- chunks.append(b'\n') # To close headers
+ chunks.append(b'\n') # To close headers
chunks.append(self._message)
return chunks
else:
self._tagger = value[0:sep+1]
try:
- (timetext, timezonetext) = value[sep+2:].rsplit(b' ', 1)
+ (timetext, timezonetext) = (
+ value[sep+2:].rsplit(b' ', 1))
self._tag_time = int(timetext)
- self._tag_timezone, self._tag_timezone_neg_utc = \
- parse_timezone(timezonetext)
+ self._tag_timezone, self._tag_timezone_neg_utc = (
+ parse_timezone(timezonetext))
except ValueError as e:
raise ObjectFormatException(e)
elif field is None:
object = property(_get_object, _set_object)
name = serializable_property("name", "The name of this tag")
- tagger = serializable_property("tagger",
- "Returns the name of the person who created this tag")
- tag_time = serializable_property("tag_time",
- "The creation timestamp of the tag. As the number of seconds "
- "since the epoch")
- tag_timezone = serializable_property("tag_timezone",
- "The timezone that tag_time is in.")
+ tagger = serializable_property(
+ "tagger",
+ "Returns the name of the person who created this tag")
+ tag_time = serializable_property(
+ "tag_time",
+ "The creation timestamp of the tag. As the number of seconds "
+ "since the epoch")
+ tag_timezone = serializable_property(
+ "tag_timezone",
+ "The timezone that tag_time is in.")
message = serializable_property(
- "message", "The message attached to this tag")
+ "message", "The message attached to this tag")
class TreeEntry(namedtuple('TreeEntry', ['path', 'mode', 'sha'])):
:return: Serialized tree text as chunks
"""
for name, mode, hexsha in items:
- yield ("%04o" % mode).encode('ascii') + b' ' + name + b'\0' + hex_to_sha(hexsha)
+ yield (("%04o" % mode).encode('ascii') + b' ' + name +
+ b'\0' + hex_to_sha(hexsha))
def sorted_tree_items(entries, name_order):
except ValueError as e:
raise ObjectFormatException(e)
# TODO: list comprehension is for efficiency in the common (small)
- # case; if memory efficiency in the large case is a concern, use a genexp.
+ # case; if memory efficiency in the large case is a concern, use a
+ # genexp.
self._entries = dict([(n, (m, s)) for n, m, s in parsed_entries])
def check(self):
offset = -offset
else:
sign = '+'
- return ('%c%02d%02d' % (sign, offset / 3600, (offset / 60) % 60)).encode('ascii')
+ return ('%c%02d%02d' %
+ (sign, offset / 3600, (offset / 60) % 60)).encode('ascii')
def parse_commit(chunks):
elif field == _COMMITTER_HEADER:
committer, timetext, timezonetext = value.rsplit(b' ', 2)
commit_time = int(timetext)
- commit_info = (committer, commit_time, parse_timezone(timezonetext))
+ commit_info = (
+ committer, commit_time, parse_timezone(timezonetext))
elif field == _ENCODING_HEADER:
encoding = value
elif field == _MERGETAG_HEADER:
def _deserialize(self, chunks):
(self._tree, self._parents, author_info, commit_info, self._encoding,
- self._mergetag, self._gpgsig, self._message, self._extra) = (
+ self._mergetag, self._gpgsig, self._message, self._extra) = (
parse_commit(chunks))
- (self._author, self._author_time, (self._author_timezone,
- self._author_timezone_neg_utc)) = author_info
- (self._committer, self._commit_time, (self._commit_timezone,
- self._commit_timezone_neg_utc)) = commit_info
+ (self._author, self._author_time,
+ (self._author_timezone, self._author_timezone_neg_utc)) = author_info
+ (self._committer, self._commit_time,
+ (self._commit_timezone, self._commit_timezone_neg_utc)) = commit_info
def check(self):
"""Check this object for internal consistency.
def _serialize(self):
chunks = []
- tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree
+ tree_bytes = (
+ self._tree.id if isinstance(self._tree, Tree) else self._tree)
chunks.append(git_line(_TREE_HEADER, tree_bytes))
for p in self._parents:
chunks.append(git_line(_PARENT_HEADER, p))
chunks.append(git_line(
- _AUTHOR_HEADER, self._author, str(self._author_time).encode('ascii'),
- format_timezone(self._author_timezone,
- self._author_timezone_neg_utc)))
+ _AUTHOR_HEADER, self._author,
+ str(self._author_time).encode('ascii'),
+ format_timezone(
+ self._author_timezone, self._author_timezone_neg_utc)))
chunks.append(git_line(
- _COMMITTER_HEADER, self._committer, str(self._commit_time).encode('ascii'),
+ _COMMITTER_HEADER, self._committer,
+ str(self._commit_time).encode('ascii'),
format_timezone(self._commit_timezone,
self._commit_timezone_neg_utc)))
if self.encoding:
"""Return extra settings of this commit."""
return self._extra
- extra = property(_get_extra,
+ extra = property(
+ _get_extra,
doc="Extra header fields not understood (presumably added in a "
"newer version of git). Kept verbatim so the object can "
"be correctly reserialized. For private commit metadata, use "
"pseudo-headers in Commit.message, rather than this field.")
- author = serializable_property("author",
+ author = serializable_property(
+ "author",
"The name of the author of the commit")
- committer = serializable_property("committer",
+ committer = serializable_property(
+ "committer",
"The name of the committer of the commit")
message = serializable_property(
"message", "The commit message")
- commit_time = serializable_property("commit_time",
- "The timestamp of the commit. As the number of seconds since the epoch.")
+ commit_time = serializable_property(
+ "commit_time",
+ "The timestamp of the commit. As the number of seconds since the "
+ "epoch.")
- commit_timezone = serializable_property("commit_timezone",
+ commit_timezone = serializable_property(
+ "commit_timezone",
"The zone the commit time is in")
- author_time = serializable_property("author_time",
+ author_time = serializable_property(
+ "author_time",
"The timestamp the commit was written. As the number of "
"seconds since the epoch.")
rh = parse_ref(rh_container, rh)
except KeyError:
# TODO: check force?
- if not b"/" in rh:
+ if b"/" not in rh:
rh = b"refs/heads/" + rh
return (lh, rh, force)
:raise ValueError: If the range can not be parsed
"""
committish = to_bytes(committish)
- return repo[committish] # For now..
+ return repo[committish] # For now..
# TODO: parse_path_in_tree(), which handles e.g. v1.0:Documentation
import os
import sys
+from hashlib import sha1
+from os import (
+ SEEK_CUR,
+ SEEK_END,
+ )
+from struct import unpack_from
+import zlib
+
try:
import mmap
except ImportError:
if sys.platform == 'Plan9':
has_mmap = False
-from hashlib import sha1
-from os import (
- SEEK_CUR,
- SEEK_END,
- )
-from struct import unpack_from
-import zlib
-
-from dulwich.errors import (
+from dulwich.errors import ( # noqa: E402
ApplyDeltaError,
ChecksumMismatch,
)
-from dulwich.file import GitFile
-from dulwich.lru_cache import (
+from dulwich.file import GitFile # noqa: E402
+from dulwich.lru_cache import ( # noqa: E402
LRUSizeCache,
)
-from dulwich.objects import (
+from dulwich.objects import ( # noqa: E402
ShaFile,
hex_to_sha,
sha_to_hex,
offset = self._pack_offset_table_offset + i * 4
offset = unpack_from('>L', self._contents, offset)[0]
if offset & (2**31):
- offset = self._pack_offset_largetable_offset + (offset&(2**31-1)) * 8
+ offset = (
+ self._pack_offset_largetable_offset +
+ (offset & (2 ** 31 - 1)) * 8)
offset = unpack_from('>Q', self._contents, offset)[0]
return offset
if base_type == OFS_DELTA:
(delta_offset, delta) = base_obj
# TODO: clean up asserts and replace with nicer error messages
- assert (
- isinstance(base_offset, int)
- or isinstance(base_offset, long))
- assert (
- isinstance(delta_offset, int)
- or isinstance(base_offset, long))
base_offset = base_offset - delta_offset
base_type, base_obj = self.get_object_at(base_offset)
assert isinstance(base_type, int)
class SHA1Reader(object):
- """Wrapper around a file-like object that remembers the SHA1 of its data."""
+ """Wrapper for file-like object that remembers the SHA1 of its data."""
def __init__(self, f):
self.f = f
class SHA1Writer(object):
- """Wrapper around a file-like object that remembers the SHA1 of its data."""
+ """Wrapper for file-like object that remembers the SHA1 of its data."""
def __init__(self, f):
self.f = f
try:
- from dulwich._pack import apply_delta, bisect_find_sha
+ from dulwich._pack import apply_delta, bisect_find_sha # noqa: F811
except ImportError:
pass
FIRST_FEW_BYTES = 8000
-def write_commit_patch(f, commit, contents, progress, version=None, encoding=None):
+def write_commit_patch(f, commit, contents, progress, version=None,
+ encoding=None):
"""Write a individual file patch.
:param commit: Commit object
if isinstance(contents, str):
contents = contents.encode(encoding)
(num, total) = progress
- f.write(b"From " + commit.id + b" " + time.ctime(commit.commit_time).encode(encoding) + b"\n")
+ f.write(b"From " + commit.id + b" " +
+ time.ctime(commit.commit_time).encode(encoding) + b"\n")
f.write(b"From: " + commit.author + b"\n")
- f.write(b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n")
- f.write(("Subject: [PATCH %d/%d] " % (num, total)).encode(encoding) + commit.message + b"\n")
+ f.write(b"Date: " +
+ time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n")
+ f.write(("Subject: [PATCH %d/%d] " % (num, total)).encode(encoding) +
+ commit.message + b"\n")
f.write(b"\n")
f.write(b"---\n")
try:
p = subprocess.Popen(["diffstat"], stdout=subprocess.PIPE,
stdin=subprocess.PIPE)
except (ImportError, OSError):
- pass # diffstat not available?
+ pass # diffstat not available?
else:
(diffstat, _) = p.communicate(contents)
f.write(diffstat)
(new_path, new_mode, new_id) = new_file
old_path = patch_filename(old_path, b"a")
new_path = patch_filename(new_path, b"b")
+
def content(mode, hexsha):
if hexsha is None:
return Blob.from_string(b'')
new_content = content(new_mode, new_id)
if not diff_binary and (
is_binary(old_content.data) or is_binary(new_content.data)):
- f.write(b"Binary files " + old_path + b" and " + new_path + b" differ\n")
+ f.write(b"Binary files " + old_path + b" and " + new_path +
+ b" differ\n")
else:
f.writelines(unified_diff(lines(old_content), lines(new_content),
- old_path, new_path))
+ old_path, new_path))
# TODO(jelmer): Support writing unicode, rather than bytes.
(new_path, new_mode, new_blob) = new_file
old_path = patch_filename(old_path, b"a")
new_path = patch_filename(new_path, b"b")
+
def lines(blob):
if blob is not None:
return blob.splitlines()
old_contents = lines(old_blob)
new_contents = lines(new_blob)
f.writelines(unified_diff(old_contents, new_contents,
- old_path, new_path))
+ old_path, new_path))
# TODO(jelmer): Support writing unicode, rather than bytes.
changes = store.tree_changes(old_tree, new_tree)
for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
write_object_diff(f, store, (oldpath, oldmode, oldsha),
- (newpath, newmode, newsha),
- diff_binary=diff_binary)
+ (newpath, newmode, newsha), diff_binary=diff_binary)
def git_am_patch_split(f, encoding=None):
"""
encoding = encoding or getattr(f, "encoding", "ascii")
contents = f.read()
- if isinstance(contents, bytes) and getattr(email.parser, "BytesParser", None):
+ if (isinstance(contents, bytes) and
+ getattr(email.parser, "BytesParser", None)):
parser = email.parser.BytesParser()
msg = parser.parsebytes(contents)
else:
"""
with open_repo_closing(repo) as r:
if not paths:
- paths = list(get_untracked_paths(os.getcwd(), r.path,
- r.open_index()))
+ paths = list(
+ get_untracked_paths(os.getcwd(), r.path, r.open_index()))
# TODO(jelmer): Possibly allow passing in absolute paths?
relpaths = []
if not isinstance(paths, list):
# Perform 'git checkout .' - syncs staged changes
tree = r[b"HEAD"].tree
- r.reset_index()
+ r.reset_index(tree=tree)
def status(repo="."):
Parts of the git wire protocol use 'pkt-lines' to communicate. A pkt-line
consists of the length of the line as a 4-byte hex string, followed by the
- payload data. The length includes the 4-byte header. The special line '0000'
- indicates the end of a section of input and is called a 'flush-pkt'.
+ payload data. The length includes the 4-byte header. The special line
+ '0000' indicates the end of a section of input and is called a 'flush-pkt'.
For details on the pkt-line format, see the cgit distribution:
Documentation/technical/protocol-common.txt
else:
if len(pkt_contents) + 4 != size:
raise GitProtocolError(
- 'Length of pkt read %04x does not match length prefix %04x' % (len(pkt_contents) + 4, size))
+ 'Length of pkt read %04x does not match length prefix %04x'
+ % (len(pkt_contents) + 4, size))
return pkt_contents
def eof(self):
"""Test whether the protocol stream has reached EOF.
- Note that this refers to the actual stream EOF and not just a flush-pkt.
+ Note that this refers to the actual stream EOF and not just a
+ flush-pkt.
:return: True if the stream is at EOF, False otherwise.
"""
def read_pkt_seq(self):
"""Read a sequence of pkt-lines from the remote git process.
- :return: Yields each line of data up to but not including the next flush-pkt.
+ :return: Yields each line of data up to but not including the next
+ flush-pkt.
"""
pkt = self.read_pkt_line()
while pkt:
to a read() method.
If you want to read n bytes from the wire and block until exactly n bytes
- (or EOF) are read, use read(n). If you want to read at most n bytes from the
- wire but don't care if you get less, use recv(n). Note that recv(n) will
- still block until at least one byte is read.
+ (or EOF) are read, use read(n). If you want to read at most n bytes from
+ the wire but don't care if you get less, use recv(n). Note that recv(n)
+ will still block until at least one byte is read.
"""
def __init__(self, recv, write, report_activity=None, rbufsize=_RBUFSIZE):
# - seek back to start rather than 0 in case some buffer has been
# consumed.
# - use SEEK_END instead of the magic number.
- # Copyright (c) 2001-2010 Python Software Foundation; All Rights Reserved
+ # Copyright (c) 2001-2010 Python Software Foundation; All Rights
+ # Reserved
# Licensed under the Python Software Foundation License.
# TODO: see if buffer is more efficient than cBytesIO.
assert size > 0
buf.write(data)
buf_len += n
del data # explicit free
- #assert buf_len == buf.tell()
+ # assert buf_len == buf.tell()
buf.seek(start)
return buf.read()
:param text: String to extract from
:return: Tuple with text with capabilities removed and list of capabilities
"""
- if not b"\0" in text:
+ if b"\0" not in text:
return text, []
text, capabilities = text.rstrip().split(b"\0")
return (text, capabilities.strip().split(b" "))
class BufferedPktLineWriter(object):
"""Writer that wraps its data in pkt-lines and has an independent buffer.
- Consecutive calls to write() wrap the data in a pkt-line and then buffers it
- until enough lines have been written such that their total length (including
- length prefix) reach the buffer size.
+ Consecutive calls to write() wrap the data in a pkt-line and then buffers
+ it until enough lines have been written such that their total length
+ (including length prefix) reach the buffer size.
"""
def __init__(self, write, bufsize=65515):
Implements all the same rules as git-check-ref-format[1].
- [1] http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
+ [1]
+ http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
:param refname: The refname to check
:return: True if refname is valid, False otherwise
"""
- # These could be combined into one big expression, but are listed separately
- # to parallel [1].
+ # These could be combined into one big expression, but are listed
+ # separately to parallel [1].
if b'/.' in refname or refname.startswith(b'.'):
return False
if b'/' not in refname:
"""Return the cached peeled value of a ref, if available.
:param name: Name of the ref to peel
- :return: The peeled value of the ref. If the ref is known not point to a
- tag, this will be the SHA the ref refers to. If the ref may point to
- a tag, but no cached information is available, None is returned.
+ :return: The peeled value of the ref. If the ref is known not point to
+ a tag, this will be the SHA the ref refers to. If the ref may point
+ to a tag, but no cached information is available, None is returned.
"""
return None
def _follow(self, name):
import warnings
warnings.warn(
- "RefsContainer._follow is deprecated. Use RefsContainer.follow instead.",
- DeprecationWarning)
+ "RefsContainer._follow is deprecated. Use RefsContainer.follow "
+ "instead.", DeprecationWarning)
refnames, contents = self.follow(name)
if not refnames:
return (None, contents)
operation.
:param name: The refname to delete.
- :param old_ref: The old sha the refname must refer to, or None to delete
- unconditionally.
+ :param old_ref: The old sha the refname must refer to, or None to
+ delete unconditionally.
:return: True if the delete was successful, False otherwise.
"""
raise NotImplementedError(self.remove_if_equals)
for root, dirs, files in os.walk(self.refpath(b'refs')):
dir = root[len(path):].strip(os.path.sep).replace(os.path.sep, "/")
for filename in files:
- refname = ("%s/%s" % (dir, filename)).encode(sys.getfilesystemencoding())
+ refname = (
+ "%s/%s" % (dir, filename)).encode(
+ sys.getfilesystemencoding())
if check_ref_format(refname):
allkeys.add(refname)
allkeys.update(self.get_packed_refs())
"""Return the disk path of a ref.
"""
- if getattr(self.path, "encode", None) and getattr(name, "decode", None):
+ if (getattr(self.path, "encode", None) and
+ getattr(name, "decode", None)):
name = name.decode(sys.getfilesystemencoding())
if os.path.sep != "/":
name = name.replace("/", os.path.sep)
"""Return the cached peeled value of a ref, if available.
:param name: Name of the ref to peel
- :return: The peeled value of the ref. If the ref is known not point to a
- tag, this will be the SHA the ref refers to. If the ref may point to
- a tag, but no cached information is available, None is returned.
+ :return: The peeled value of the ref. If the ref is known not point to
+ a tag, this will be the SHA the ref refers to. If the ref may point
+ to a tag, but no cached information is available, None is returned.
"""
self.get_packed_refs()
if self._peeled_refs is None or name not in self._packed_refs:
# read again while holding the lock
orig_ref = self.read_loose_ref(realname)
if orig_ref is None:
- orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
+ orig_ref = self.get_packed_refs().get(
+ realname, ZERO_SHA)
if orig_ref != old_ref:
f.abort()
return False
perform an atomic compare-and-delete operation.
:param name: The refname to delete.
- :param old_ref: The old sha the refname must refer to, or None to delete
- unconditionally.
+ :param old_ref: The old sha the refname must refer to, or None to
+ delete unconditionally.
:return: True if the delete was successful, False otherwise.
"""
self._check_refname(name)
yield peeled.id + b'\t' + name + ANNOTATED_TAG_SUFFIX + b'\n'
-is_local_branch = lambda x: x.startswith(b'refs/heads/')
+def is_local_branch(x):
+ return x.startswith(b'refs/heads/')
CommitMsgShellHook,
)
-from dulwich.refs import (
+from dulwich.refs import ( # noqa: F401
check_ref_format,
RefsContainer,
DictRefsContainer,
from dulwich.pack import (
write_pack_objects,
)
-from dulwich.protocol import (
+from dulwich.protocol import ( # noqa: F401
BufferedPktLineWriter,
capability_agent,
CAPABILITIES_REF,
Yield the objects required for a list of commits.
:param progress: is a callback to send progress messages to the client
- :param get_tagged: Function that returns a dict of pointed-to sha -> tag
- sha for including tags.
+ :param get_tagged: Function that returns a dict of pointed-to sha ->
+ tag sha for including tags.
"""
raise NotImplementedError
class FileSystemBackend(Backend):
- """Simple backend that looks up Git repositories in the local file system."""
+ """Simple backend looking up Git repositories in the local file system."""
def __init__(self, root=os.sep):
super(FileSystemBackend, self).__init__()
- self.root = (os.path.abspath(root) + os.sep).replace(os.sep * 2, os.sep)
+ self.root = (os.path.abspath(root) + os.sep).replace(
+ os.sep * 2, os.sep)
def open_repository(self, path):
logger.debug('opening repository at %s', path)
normcase_abspath = os.path.normcase(abspath)
normcase_root = os.path.normcase(self.root)
if not normcase_abspath.startswith(normcase_root):
- raise NotGitRepository("Path %r not inside root %r" % (path, self.root))
+ raise NotGitRepository(
+ "Path %r not inside root %r" %
+ (path, self.root))
return Repo(abspath)
self._done_received = True
-
class UploadPackHandler(PackHandler):
"""Protocol handler for uploading a pack to the client."""
def __init__(self, backend, args, proto, http_req=None,
advertise_refs=False):
- super(UploadPackHandler, self).__init__(backend, proto,
- http_req=http_req)
+ super(UploadPackHandler, self).__init__(
+ backend, proto, http_req=http_req)
self.repo = backend.open_repository(args[0])
self._graph_walker = None
self.advertise_refs = advertise_refs
@classmethod
def required_capabilities(cls):
- return (CAPABILITY_SIDE_BAND_64K, CAPABILITY_THIN_PACK, CAPABILITY_OFS_DELTA)
+ return (CAPABILITY_SIDE_BAND_64K, CAPABILITY_THIN_PACK,
+ CAPABILITY_OFS_DELTA)
def progress(self, message):
- if self.has_capability(CAPABILITY_NO_PROGRESS) or self._processing_have_lines:
+ if (self.has_capability(CAPABILITY_NO_PROGRESS) or
+ self._processing_have_lines):
return
self.proto.write_sideband(SIDE_BAND_CHANNEL_PROGRESS, message)
def get_tagged(self, refs=None, repo=None):
"""Get a dict of peeled values of tags to their original tag shas.
- :param refs: dict of refname -> sha of possible tags; defaults to all of
- the backend's refs.
- :param repo: optional Repo instance for getting peeled refs; defaults to
- the backend's repo, if available
+ :param refs: dict of refname -> sha of possible tags; defaults to all
+ of the backend's refs.
+ :param repo: optional Repo instance for getting peeled refs; defaults
+ to the backend's repo, if available
:return: dict of peeled_sha -> tag_sha, where tag_sha is the sha of a
tag whose peeled value is peeled_sha.
"""
return tagged
def handle(self):
- write = lambda x: self.proto.write_sideband(SIDE_BAND_CHANNEL_DATA, x)
+ def write(x):
+ return self.proto.write_sideband(SIDE_BAND_CHANNEL_DATA, x)
- graph_walker = ProtocolGraphWalker(self, self.repo.object_store,
- self.repo.get_peeled)
+ graph_walker = ProtocolGraphWalker(
+ self, self.repo.object_store, self.repo.get_peeled)
objects_iter = self.repo.fetch_objects(
graph_walker.determine_wants, graph_walker, self.progress,
get_tagged=self.get_tagged)
self._processing_have_lines = False
if not graph_walker.handle_done(
- not self.has_capability(CAPABILITY_NO_DONE), self._done_received):
+ not self.has_capability(CAPABILITY_NO_DONE),
+ self._done_received):
return
self.progress(b"dul-daemon says what\n")
- self.progress(("counting objects: %d, done.\n" % len(objects_iter)).encode('ascii'))
+ self.progress(
+ ("counting objects: %d, done.\n" % len(objects_iter)).encode(
+ 'ascii'))
write_pack_objects(ProtocolFile(None, write), objects_iter)
self.progress(b"how was that, then?\n")
# we are done
these sets may overlap if a commit is reachable along multiple paths.
"""
parents = {}
+
def get_parents(sha):
result = parents.get(sha, None)
if not result:
The work of determining which acks to send is passed on to the
implementation instance stored in _impl. The reason for this is that we do
not know at object creation time what ack level the protocol requires. A
- call to set_ack_level() is required to set up the implementation, before any
- calls to next() or ack() are made.
+ call to set_ack_level() is required to set up the implementation, before
+ any calls to next() or ack() are made.
"""
def __init__(self, handler, object_store, get_peeled):
self.handler = handler
if self.http_req and self.proto.eof():
# The client may close the socket at this point, expecting a
- # flush-pkt from the server. We might be ready to send a packfile at
- # this point, so we need to explicitly short-circuit in this case.
+ # flush-pkt from the server. We might be ready to send a packfile
+ # at this point, so we need to explicitly short-circuit in this
+ # case.
return []
return want_revs
def _handle_shallow_request(self, wants):
while True:
- command, val = self.read_proto_line((COMMAND_DEEPEN, COMMAND_SHALLOW))
+ command, val = self.read_proto_line(
+ (COMMAND_DEEPEN, COMMAND_SHALLOW))
if command == COMMAND_DEEPEN:
depth = val
break
def __init__(self, backend, args, proto, http_req=None,
advertise_refs=False):
- super(ReceivePackHandler, self).__init__(backend, proto,
- http_req=http_req)
+ super(ReceivePackHandler, self).__init__(
+ backend, proto, http_req=http_req)
self.repo = backend.open_repository(args[0])
self.advertise_refs = advertise_refs
@classmethod
def capabilities(cls):
- return (CAPABILITY_REPORT_STATUS, CAPABILITY_DELETE_REFS, CAPABILITY_QUIET,
- CAPABILITY_OFS_DELTA, CAPABILITY_SIDE_BAND_64K, CAPABILITY_NO_DONE)
+ return (CAPABILITY_REPORT_STATUS, CAPABILITY_DELETE_REFS,
+ CAPABILITY_QUIET, CAPABILITY_OFS_DELTA,
+ CAPABILITY_SIDE_BAND_64K, CAPABILITY_NO_DONE)
def _apply_pack(self, refs):
all_exceptions = (IOError, OSError, ChecksumMismatch, ApplyDeltaError,
will_send_pack = True
if will_send_pack:
- # TODO: more informative error messages than just the exception string
+ # TODO: more informative error messages than just the exception
+ # string
try:
recv = getattr(self.proto, "recv", None)
self.repo.object_store.add_thin_pack(self.proto.read, recv)
status.append((b'unpack', b'ok'))
except all_exceptions as e:
status.append((b'unpack', str(e).replace('\n', '')))
- # The pack may still have been moved in, but it may contain broken
- # objects. We trust a later GC to clean it up.
+ # The pack may still have been moved in, but it may contain
+ # broken objects. We trust a later GC to clean it up.
else:
- # The git protocol want to find a status entry related to unpack process
- # even if no pack data has been sent.
+ # The git protocol want to find a status entry related to unpack
+ # process even if no pack data has been sent.
status.append((b'unpack', b'ok'))
for oldsha, sha, ref in refs:
ref_status = b'ok'
try:
if sha == ZERO_SHA:
- if not CAPABILITY_DELETE_REFS in self.capabilities():
+ if CAPABILITY_DELETE_REFS not in self.capabilities():
raise GitProtocolError(
'Attempted to delete refs without delete-refs '
'capability.')
self.proto.write_pkt_line(None)
else:
write = self.proto.write_pkt_line
- flush = lambda: None
+
+ def flush():
+ pass
for name, msg in status:
if name == b'unpack':
DEFAULT_HANDLERS = {
b'git-upload-pack': UploadPackHandler,
b'git-receive-pack': ReceivePackHandler,
-# b'git-upload-archive': UploadArchiveHandler,
- }
+ # b'git-upload-archive': UploadArchiveHandler,
+}
class TCPGitRequestHandler(SocketServer.StreamRequestHandler):
if handlers is not None:
self.handlers.update(handlers)
self.backend = backend
- logger.info('Listening for TCP connections on %s:%d', listen_addr, port)
+ logger.info('Listening for TCP connections on %s:%d',
+ listen_addr, port)
SocketServer.TCPServer.__init__(self, (listen_addr, port),
self._make_handler)
outf=sys.stdout):
"""Serve a single command.
- This is mostly useful for the implementation of commands used by e.g. git+ssh.
+ This is mostly useful for the implementation of commands used by e.g.
+ git+ssh.
:param handler_cls: `Handler` class to use for the request
:param argv: execv-style command-line arguments. Defaults to sys.argv.
"""
if backend is None:
backend = FileSystemBackend()
+
def send_fn(data):
outf.write(data)
outf.flush()
def generate_objects_info_packs(repo):
"""Generate an index for for packs."""
for pack in repo.object_store.packs:
- yield b'P ' + pack.data.filename.encode(sys.getfilesystemencoding()) + b'\n'
+ yield (
+ b'P ' + pack.data.filename.encode(sys.getfilesystemencoding()) +
+ b'\n')
def update_server_info(repo):
This generates info/refs and objects/info/packs,
similar to "git update-server-info".
"""
- repo._put_named_file(os.path.join('info', 'refs'),
+ repo._put_named_file(
+ os.path.join('info', 'refs'),
b"".join(generate_info_refs(repo)))
- repo._put_named_file(os.path.join('objects', 'info', 'packs'),
+ repo._put_named_file(
+ os.path.join('objects', 'info', 'packs'),
b"".join(generate_objects_info_packs(repo)))
# If Python itself provides an exception, use that
import unittest
-from unittest import SkipTest, TestCase as _TestCase, skipIf, expectedFailure
+from unittest import ( # noqa: F401
+ SkipTest,
+ TestCase as _TestCase,
+ skipIf,
+ expectedFailure,
+ )
class TestCase(_TestCase):
"""Blackbox testing."""
# TODO(jelmer): Include more possible binary paths.
- bin_directories = [os.path.abspath(os.path.join(os.path.dirname(__file__),
- "..", "..", "bin")), '/usr/bin', '/usr/local/bin']
+ bin_directories = [os.path.abspath(os.path.join(
+ os.path.dirname(__file__), "..", "..", "bin")), '/usr/bin',
+ '/usr/local/bin']
def bin_path(self, name):
"""Determine the full path of a binary.
#
# Save us from all that headache and call python with the bin script.
argv = [sys.executable, self.bin_path(name)] + args
- return subprocess.Popen(argv,
- stdout=subprocess.PIPE,
- stdin=subprocess.PIPE, stderr=subprocess.PIPE,
- env=env)
+ return subprocess.Popen(
+ argv,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE, stderr=subprocess.PIPE,
+ env=env)
def self_test_suite():
'conclusion',
]
tutorial_files = ["../../docs/tutorial/%s.txt" % name for name in tutorial]
+
def setup(test):
test.__old_cwd = os.getcwd()
test.__dulwich_tempdir = tempfile.mkdtemp()
os.chdir(test.__dulwich_tempdir)
+
def teardown(test):
os.chdir(test.__old_cwd)
shutil.rmtree(test.__dulwich_tempdir)
- return doctest.DocFileSuite(setUp=setup, tearDown=teardown,
- *tutorial_files)
+ return doctest.DocFileSuite(
+ setUp=setup, tearDown=teardown, *tutorial_files)
def nocompat_test_suite():
BaseHTTPServer = http.server
SimpleHTTPServer = http.server
-if sys.platform == 'win32':
- import ctypes
-
from dulwich import (
client,
errors,
)
+if sys.platform == 'win32':
+ import ctypes
+
+
class DulwichClientTestBase(object):
"""Tests for client/server compatibility."""
def setUp(self):
- self.gitroot = os.path.dirname(import_repo_to_dir('server_new.export').rstrip(os.sep))
+ self.gitroot = os.path.dirname(
+ import_repo_to_dir('server_new.export').rstrip(os.sep))
self.dest = os.path.join(self.gitroot, 'dest')
file.ensure_dir_exists(self.dest)
run_git_or_fail(['init', '--quiet', '--bare'], cwd=self.dest)
sendrefs, gen_pack = self.compute_send(src)
c = self._client()
try:
- c.send_pack(self._build_path('/dest'), lambda _: sendrefs, gen_pack)
+ c.send_pack(self._build_path('/dest'), lambda _: sendrefs,
+ gen_pack)
except errors.UpdateRefsError as e:
- self.assertIn(str(e),
- ['{0}, {1} failed to update'.format(
- branch.decode('ascii'), master.decode('ascii')),
- '{1}, {0} failed to update'.format(
- branch.decode('ascii'), master.decode('ascii'))])
+ self.assertIn(
+ str(e),
+ ['{0}, {1} failed to update'.format(
+ branch.decode('ascii'), master.decode('ascii')),
+ '{1}, {0} failed to update'.format(
+ branch.decode('ascii'), master.decode('ascii'))])
self.assertEqual({branch: b'non-fast-forward',
master: b'non-fast-forward'},
e.ref_status)
from io import BytesIO
from itertools import chain
import os
-import sys
import tempfile
from dulwich.objects import (
"""Tests for archive support."""
from io import BytesIO
-import sys
import tarfile
from dulwich.archive import tar_stream
def test_fetch_pack_ignores_magic_ref(self):
self.rin.write(
- b'00000000000000000000000000000000000000000000 capabilities^{}\x00 multi_ack '
+ b'00000000000000000000000000000000000000000000 capabilities^{}'
+ b'\x00 multi_ack '
b'thin-pack side-band side-band-64k ofs-delta shallow no-progress '
b'include-tag\n'
b'0000')
b"ng refs/foo/bar pre-receive hook declined",
b'']
for pkt in pkts:
- if pkt == b'':
+ if pkt == b'':
self.rin.write(b"0000")
else:
self.rin.write(("%04x" % (len(pkt)+4)).encode('ascii') + pkt)
self.rout.getvalue(),
[b'007f0000000000000000000000000000000000000000 ' + commit.id +
b' refs/heads/blah12\x00report-status ofs-delta0000' +
- f.getvalue(),
+ f.getvalue(),
b'007f0000000000000000000000000000000000000000 ' + commit.id +
b' refs/heads/blah12\x00ofs-delta report-status0000' +
- f.getvalue()])
+ f.getvalue()])
def test_send_pack_no_deleteref_delete_only(self):
pkts = [b'310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/master'
self.client.alternative_paths[b'upload-pack'] = (
b'/usr/lib/git/git-upload-pack -ibla')
self.assertEqual(b"/usr/lib/git/git-upload-pack -ibla",
- self.client._get_cmd_path(b'upload-pack'))
+ self.client._get_cmd_path(b'upload-pack'))
def test_connect(self):
server = self.server
b'b0931cadc54336e78a1d980420e3268903b57a50'
}, ret)
self.assertEqual(
- b"PACK\x00\x00\x00\x02\x00\x00\x00\x00\x02\x9d\x08"
- b"\x82;\xd8\xa8\xea\xb5\x10\xadj\xc7\\\x82<\xfd>\xd3\x1e", out.getvalue())
+ b"PACK\x00\x00\x00\x02\x00\x00\x00\x00\x02\x9d\x08"
+ b"\x82;\xd8\xa8\xea\xb5\x10\xadj\xc7\\\x82<\xfd>\xd3\x1e",
+ out.getvalue())
def test_fetch_pack_none(self):
c = LocalGitClient()
lambda heads: [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"],
graph_walker=walker, pack_data=out.write)
# Hardcoding is not ideal, but we'll fix that some other day..
- self.assertTrue(out.getvalue().startswith(b'PACK\x00\x00\x00\x02\x00\x00\x00\x07'))
+ self.assertTrue(out.getvalue().startswith(
+ b'PACK\x00\x00\x00\x02\x00\x00\x00\x07'))
def test_send_pack_without_changes(self):
local = open_repo('a.git')
self.assertDictEqual(local.refs.as_dict(), refs)
def send_and_verify(self, branch, local, target):
- """Send a branch from local to remote repository and verify it worked."""
+ """Send branch from local to remote repository and verify it worked."""
client = LocalGitClient()
ref_name = b"refs/heads/" + branch
new_refs = client.send_pack(target.path,
self.assertEqual('user', c._username)
self.assertEqual('passwd', c._password)
[pw_handler] = [
- h for h in c.opener.handlers if getattr(h, 'passwd', None) is not None]
+ h for h in c.opener.handlers
+ if getattr(h, 'passwd', None) is not None]
self.assertEqual(
('user', 'passwd'),
pw_handler.passwd.find_user_password(
self.assertIs(None, c._username)
self.assertIs(None, c._password)
pw_handler = [
- h for h in c.opener.handlers if getattr(h, 'passwd', None) is not None]
+ h for h in c.opener.handlers
+ if getattr(h, 'passwd', None) is not None]
self.assertEqual(0, len(pw_handler))
def test_from_parsedurl_on_url_with_quoted_credentials(self):
self.assertEqual(original_username, c._username)
self.assertEqual(original_password, c._password)
[pw_handler] = [
- h for h in c.opener.handlers if getattr(h, 'passwd', None) is not None]
+ h for h in c.opener.handlers
+ if getattr(h, 'passwd', None) is not None]
self.assertEqual(
(original_username, original_password),
pw_handler.passwd.find_user_password(
host = 'github.com'
path = '/jelmer/dulwich'
port = 9090
- c = TCPGitClient(host, port=9090)
+ c = TCPGitClient(host, port=port)
url = c.get_url(path)
self.assertEqual('git://github.com:9090/jelmer/dulwich', url)
"""Tests for reading and writing configuration files."""
from io import BytesIO
-import os
from dulwich.config import (
ConfigDict,
ConfigFile,
def test_default_config(self):
cf = self.from_file(b"""[core]
- repositoryformatversion = 0
- filemode = true
- bare = false
- logallrefupdates = true
+\trepositoryformatversion = 0
+\tfilemode = true
+\tbare = false
+\tlogallrefupdates = true
""")
self.assertEqual(ConfigFile({(b"core", ): {
b"repositoryformatversion": b"0",
self.assertEqual(b"barla", cf.get((b"core", ), b"foo"))
def test_from_file_with_open_quoted(self):
- self.assertRaises(ValueError,
- self.from_file, b"[core]\nfoo = \"bar\n")
+ self.assertRaises(ValueError, self.from_file, b"[core]\nfoo = \"bar\n")
def test_from_file_with_quotes(self):
cf = self.from_file(
self.assertEqual(b"bar", cf.get((b"branch", b"foo"), b"foo"))
def test_from_file_subsection_invalid(self):
- self.assertRaises(ValueError,
- self.from_file, b"[branch \"foo]\nfoo = bar\n")
+ self.assertRaises(
+ ValueError, self.from_file, b"[branch \"foo]\nfoo = bar\n")
def test_from_file_subsection_not_quoted(self):
cf = self.from_file(b"[branch.foo]\nfoo = bar\n")
def test_quoted(self):
cf = self.from_file(b"""[gui]
- fontdiff = -family \\\"Ubuntu Mono\\\" -size 11 -weight normal -slant roman -underline 0 -overstrike 0
+\tfontdiff = -family \\\"Ubuntu Mono\\\" -size 11 -overstrike 0
""")
self.assertEqual(ConfigFile({(b'gui', ): {
- b'fontdiff': b'-family "Ubuntu Mono" -size 11 -weight normal -slant roman -underline 0 -overstrike 0',
+ b'fontdiff': b'-family "Ubuntu Mono" -size 11 -overstrike 0',
}}), cf)
def test_quoted_multiline(self):
cf = self.from_file(b"""[alias]
who = \"!who() {\\
- git log --no-merges --pretty=format:'%an - %ae' $@ | sort | uniq -c | sort -rn;\\
+ git log --no-merges --pretty=format:'%an - %ae' $@ | uniq -c | sort -rn;\\
};\\
who\"
""")
self.assertEqual(ConfigFile({(b'alias', ): {
- b'who': b"!who() {git log --no-merges --pretty=format:'%an - %ae' $@ | sort | uniq -c | sort -rn;};who"}}), cf)
+ b'who': (b"!who() {git log --no-merges --pretty=format:'%an - "
+ b"%ae' $@ | uniq -c | sort -rn;};who")
+ }}), cf)
def test_set_hash_gets_quoted(self):
c = ConfigFile()
cd = ConfigDict()
cd.set((b"core2", ), b"foo", b"bloe")
- self.assertEqual([],
- list(cd.iteritems((b"core", ))))
+ self.assertEqual([], list(cd.iteritems((b"core", ))))
def test_itersections(self):
cd = ConfigDict()
cd.set((b"core2", ), b"foo", b"bloe")
- self.assertEqual([(b"core2", )],
- list(cd.itersections()))
+ self.assertEqual([(b"core2", )], list(cd.itersections()))
class StackedConfigTests(TestCase):
def testSubmodules(self):
cf = ConfigFile.from_file(BytesIO(b"""\
[submodule "core/lib"]
- path = core/lib
- url = https://github.com/phhusson/QuasselC.git
+\tpath = core/lib
+\turl = https://github.com/phhusson/QuasselC.git
"""))
got = list(parse_submodules(cf))
self.assertEqual([
- (b'core/lib', b'https://github.com/phhusson/QuasselC.git', b'core/lib')], got)
+ (b'core/lib', b'https://github.com/phhusson/QuasselC.git',
+ b'core/lib')], got)
self.assertChangesEqual([], self.empty_tree, self.empty_tree)
self.assertChangesEqual([], tree, tree)
self.assertChangesEqual(
- [TreeChange(CHANGE_UNCHANGED, (b'a', F, blob.id), (b'a', F, blob.id)),
+ [TreeChange(CHANGE_UNCHANGED, (b'a', F, blob.id),
+ (b'a', F, blob.id)),
TreeChange(CHANGE_UNCHANGED, (b'b/c', F, blob.id),
(b'b/c', F, blob.id))],
tree, tree, want_unchanged=True)
tree1 = self.commit_tree([(b'a', blob), (b'a.', blob), (b'a..', blob)])
# Tree order is the reverse of this, so if we used tree order, 'a..'
# would not be merged.
- tree2 = self.commit_tree([(b'a/x', blob), (b'a./x', blob), (b'a..', blob)])
+ tree2 = self.commit_tree(
+ [(b'a/x', blob), (b'a./x', blob), (b'a..', blob)])
self.assertChangesEqual(
[TreeChange.delete((b'a', F, blob.id)),
merge = self.commit_tree([(b'a', blob2)])
self.assertChangesForMergeEqual(
[[TreeChange.add((b'a', F, blob2.id)),
- TreeChange(CHANGE_MODIFY, (b'a', F, blob1.id), (b'a', F, blob2.id))]],
+ TreeChange(CHANGE_MODIFY, (b'a', F, blob1.id),
+ (b'a', F, blob2.id))]],
[parent1, parent2], merge)
def test_tree_changes_for_merge_modify_modify_conflict(self):
parent2 = self.commit_tree([(b'a', blob2)])
merge = self.commit_tree([(b'a', blob3)])
self.assertChangesForMergeEqual(
- [[TreeChange(CHANGE_MODIFY, (b'a', F, blob1.id), (b'a', F, blob3.id)),
- TreeChange(CHANGE_MODIFY, (b'a', F, blob2.id), (b'a', F, blob3.id))]],
+ [[TreeChange(CHANGE_MODIFY, (b'a', F, blob1.id),
+ (b'a', F, blob3.id)),
+ TreeChange(CHANGE_MODIFY, (b'a', F, blob2.id),
+ (b'a', F, blob3.id))]],
[parent1, parent2], merge)
def test_tree_changes_for_merge_modify_no_conflict(self):
parent2 = self.commit_tree([])
merge = self.commit_tree([(b'b', blob)])
add = TreeChange.add((b'b', F, blob.id))
- self.assertChangesForMergeEqual([[add, add]], [parent1, parent2], merge)
+ self.assertChangesForMergeEqual(
+ [[add, add]], [parent1, parent2], merge)
def test_tree_changes_for_merge_add_exact_rename_conflict(self):
blob = make_object(Blob, data=b'a\nb\nc\nd\n')
parent2 = self.commit_tree([])
merge = self.commit_tree([(b'b', blob)])
self.assertChangesForMergeEqual(
- [[TreeChange(CHANGE_RENAME, (b'a', F, blob.id), (b'b', F, blob.id)),
+ [[TreeChange(CHANGE_RENAME, (b'a', F, blob.id),
+ (b'b', F, blob.id)),
TreeChange.add((b'b', F, blob.id))]],
[parent1, parent2], merge, rename_detector=self.detector)
parent2 = self.commit_tree([])
merge = self.commit_tree([(b'b', blob2)])
self.assertChangesForMergeEqual(
- [[TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'b', F, blob2.id)),
+ [[TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'b', F, blob2.id)),
TreeChange.add((b'b', F, blob2.id))]],
[parent1, parent2], merge, rename_detector=self.detector)
parent2 = self.commit_tree([(b'b', blob1)])
merge = self.commit_tree([(b'b', blob2)])
self.assertChangesForMergeEqual(
- [[TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'b', F, blob2.id)),
- TreeChange(CHANGE_MODIFY, (b'b', F, blob1.id), (b'b', F, blob2.id))]],
+ [[TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'b', F, blob2.id)),
+ TreeChange(CHANGE_MODIFY, (b'b', F, blob1.id),
+ (b'b', F, blob2.id))]],
[parent1, parent2], merge, rename_detector=self.detector)
def _do_test_count_blocks(self, count_blocks):
blob = make_object(Blob, data=b'a\nb\na\n')
- self.assertEqual({hash(b'a\n'): 4, hash(b'b\n'): 2}, count_blocks(blob))
+ self.assertEqual({hash(b'a\n'): 4, hash(b'b\n'): 2},
+ count_blocks(blob))
test_count_blocks = functest_builder(_do_test_count_blocks,
_count_blocks_py)
def _do_test_count_blocks_chunks(self, count_blocks):
blob = ShaFile.from_raw_chunks(Blob.type_num, [b'a\nb', b'\na\n'])
- self.assertEqual({hash(b'a\n'): 4, hash(b'b\n'): 2}, _count_blocks(blob))
+ self.assertEqual({hash(b'a\n'): 4, hash(b'b\n'): 2},
+ _count_blocks(blob))
test_count_blocks_chunks = functest_builder(_do_test_count_blocks_chunks,
_count_blocks_py)
a = b'a' * 64
data = a + b'xxx\ny\n' + a + b'zzz\n'
blob = make_object(Blob, data=data)
- self.assertEqual({hash(b'a' * 64): 128, hash(b'xxx\n'): 4, hash(b'y\n'): 2,
- hash(b'zzz\n'): 4},
+ self.assertEqual({hash(b'a' * 64): 128, hash(b'xxx\n'): 4,
+ hash(b'y\n'): 2, hash(b'zzz\n'): 4},
_count_blocks(blob))
test_count_blocks_long_lines = functest_builder(
tree1 = self.commit_tree([(b'a', blob1), (b'b', blob2)])
tree2 = self.commit_tree([(b'a', blob1), (b'b', blob3)])
self.assertEqual(
- [TreeChange(CHANGE_MODIFY, (b'b', F, blob2.id), (b'b', F, blob3.id))],
+ [TreeChange(CHANGE_MODIFY, (b'b', F, blob2.id),
+ (b'b', F, blob3.id))],
self.detect_renames(tree1, tree2))
def test_exact_rename_one_to_one(self):
tree1 = self.commit_tree([(b'a', blob1), (b'b', blob2)])
tree2 = self.commit_tree([(b'c', blob1), (b'd', blob2)])
self.assertEqual(
- [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'c', F, blob1.id)),
- TreeChange(CHANGE_RENAME, (b'b', F, blob2.id), (b'd', F, blob2.id))],
- self.detect_renames(tree1, tree2))
+ [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'c', F, blob1.id)),
+ TreeChange(CHANGE_RENAME, (b'b', F, blob2.id),
+ (b'd', F, blob2.id))],
+ self.detect_renames(tree1, tree2))
def test_exact_rename_split_different_type(self):
blob = make_object(Blob, data=b'/foo')
tree1 = self.commit_tree([(b'a', blob1)])
tree2 = self.commit_tree([(b'a', blob2, 0o120000), (b'b', blob1)])
self.assertEqual(
- [TreeChange.add((b'a', 0o120000, blob2.id)),
- TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'b', F, blob1.id))],
- self.detect_renames(tree1, tree2))
+ [TreeChange.add((b'a', 0o120000, blob2.id)),
+ TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'b', F, blob1.id))],
+ self.detect_renames(tree1, tree2))
def test_exact_rename_one_to_many(self):
blob = make_object(Blob, data=b'1')
tree1 = self.commit_tree([(b'a', blob), (b'b', blob)])
tree2 = self.commit_tree([(b'c', blob), (b'd', blob), (b'e', blob)])
self.assertEqual(
- [TreeChange(CHANGE_RENAME, (b'a', F, blob.id), (b'c', F, blob.id)),
- TreeChange(CHANGE_COPY, (b'a', F, blob.id), (b'e', F, blob.id)),
- TreeChange(CHANGE_RENAME, (b'b', F, blob.id), (b'd', F, blob.id))],
- self.detect_renames(tree1, tree2))
+ [TreeChange(CHANGE_RENAME, (b'a', F, blob.id),
+ (b'c', F, blob.id)),
+ TreeChange(CHANGE_COPY, (b'a', F, blob.id),
+ (b'e', F, blob.id)),
+ TreeChange(CHANGE_RENAME, (b'b', F, blob.id),
+ (b'd', F, blob.id))],
+ self.detect_renames(tree1, tree2))
def test_exact_copy_modify(self):
blob1 = make_object(Blob, data=b'a\nb\nc\nd\n')
tree1 = self.commit_tree([(b'a', blob1)])
tree2 = self.commit_tree([(b'a', blob2), (b'b', blob1)])
self.assertEqual(
- [TreeChange(CHANGE_MODIFY, (b'a', F, blob1.id), (b'a', F, blob2.id)),
- TreeChange(CHANGE_COPY, (b'a', F, blob1.id), (b'b', F, blob1.id))],
+ [TreeChange(CHANGE_MODIFY, (b'a', F, blob1.id),
+ (b'a', F, blob2.id)),
+ TreeChange(CHANGE_COPY, (b'a', F, blob1.id),
+ (b'b', F, blob1.id))],
self.detect_renames(tree1, tree2))
def test_exact_copy_change_mode(self):
tree1 = self.commit_tree([(b'a', blob1)])
tree2 = self.commit_tree([(b'b', blob2)])
self.assertEqual(
- [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'b', F, blob2.id))],
+ [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'b', F, blob2.id))],
self.detect_renames(tree1, tree2, rename_threshold=50))
self.assertEqual(
[TreeChange.delete((b'a', F, blob1.id)),
tree1 = self.commit_tree([(b'a', blob1), (b'b', blob2)])
tree2 = self.commit_tree([(b'c', blob3), (b'd', blob4)])
self.assertEqual(
- [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'd', F, blob4.id)),
- TreeChange(CHANGE_RENAME, (b'b', F, blob2.id), (b'c', F, blob3.id))],
+ [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'd', F, blob4.id)),
+ TreeChange(CHANGE_RENAME, (b'b', F, blob2.id),
+ (b'c', F, blob3.id))],
self.detect_renames(tree1, tree2))
self.assertEqual(
[TreeChange.delete((b'a', F, blob1.id)),
tree2 = self.commit_tree([(b'c', blob3)])
self.assertEqual(
[TreeChange.delete((b'a', F, blob1.id)),
- TreeChange(CHANGE_RENAME, (b'b', F, blob2.id), (b'c', F, blob3.id))],
+ TreeChange(CHANGE_RENAME, (b'b', F, blob2.id),
+ (b'c', F, blob3.id))],
self.detect_renames(tree1, tree2))
tree3 = self.commit_tree([(b'a', blob2), (b'b', blob1)])
tree4 = self.commit_tree([(b'c', blob3)])
self.assertEqual(
- [TreeChange(CHANGE_RENAME, (b'a', F, blob2.id), (b'c', F, blob3.id)),
+ [TreeChange(CHANGE_RENAME, (b'a', F, blob2.id),
+ (b'c', F, blob3.id)),
TreeChange.delete((b'b', F, blob1.id))],
self.detect_renames(tree3, tree4))
tree2 = self.commit_tree([(b'b', blob2), (b'c', blob3)])
self.assertEqual(
[TreeChange(CHANGE_COPY, (b'a', F, blob1.id), (b'b', F, blob2.id)),
- TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'c', F, blob3.id))],
+ TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'c', F, blob3.id))],
self.detect_renames(tree1, tree2))
def test_content_rename_many_to_one(self):
tree1 = self.commit_tree([(b'a', blob1), (b'b', blob2)])
tree2 = self.commit_tree([(b'c', blob3)])
self.assertEqual(
- [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'c', F, blob3.id)),
+ [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'c', F, blob3.id)),
TreeChange.delete((b'b', F, blob2.id))],
self.detect_renames(tree1, tree2))
# TODO(dborowitz): Distribute renames rather than greedily choosing
# copies.
self.assertEqual(
- [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'c', F, blob3.id)),
+ [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'c', F, blob3.id)),
TreeChange(CHANGE_COPY, (b'a', F, blob1.id), (b'd', F, blob4.id)),
TreeChange.delete((b'b', F, blob2.id))],
self.detect_renames(tree1, tree2))
tree1 = self.commit_tree([(b'a', blob1), (b'b', blob2)])
tree2 = self.commit_tree([(b'a', blob2), (b'b', blob1)])
self.assertEqual(
- [TreeChange(CHANGE_MODIFY, (b'a', F, blob1.id), (b'a', F, blob2.id)),
- TreeChange(CHANGE_MODIFY, (b'b', F, blob2.id), (b'b', F, blob1.id))],
+ [TreeChange(CHANGE_MODIFY, (b'a', F, blob1.id),
+ (b'a', F, blob2.id)),
+ TreeChange(CHANGE_MODIFY, (b'b', F, blob2.id),
+ (b'b', F, blob1.id))],
self.detect_renames(tree1, tree2))
self.assertEqual(
- [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'b', F, blob1.id)),
- TreeChange(CHANGE_RENAME, (b'b', F, blob2.id), (b'a', F, blob2.id))],
+ [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'b', F, blob1.id)),
+ TreeChange(CHANGE_RENAME, (b'b', F, blob2.id),
+ (b'a', F, blob2.id))],
self.detect_renames(tree1, tree2, rewrite_threshold=50))
def test_content_rename_swap(self):
tree1 = self.commit_tree([(b'a', blob1), (b'b', blob2)])
tree2 = self.commit_tree([(b'a', blob4), (b'b', blob3)])
self.assertEqual(
- [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'b', F, blob3.id)),
- TreeChange(CHANGE_RENAME, (b'b', F, blob2.id), (b'a', F, blob4.id))],
+ [TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'b', F, blob3.id)),
+ TreeChange(CHANGE_RENAME, (b'b', F, blob2.id),
+ (b'a', F, blob4.id))],
self.detect_renames(tree1, tree2, rewrite_threshold=60))
def test_rewrite_threshold(self):
tree2 = self.commit_tree([(b'a', blob3), (b'b', blob2)])
no_renames = [
- TreeChange(CHANGE_MODIFY, (b'a', F, blob1.id), (b'a', F, blob3.id)),
+ TreeChange(CHANGE_MODIFY, (b'a', F, blob1.id),
+ (b'a', F, blob3.id)),
TreeChange(CHANGE_COPY, (b'a', F, blob1.id), (b'b', F, blob2.id))]
self.assertEqual(
no_renames, self.detect_renames(tree1, tree2))
self.assertEqual(
- no_renames, self.detect_renames(tree1, tree2, rewrite_threshold=40))
+ no_renames, self.detect_renames(
+ tree1, tree2, rewrite_threshold=40))
self.assertEqual(
[TreeChange.add((b'a', F, blob3.id)),
- TreeChange(CHANGE_RENAME, (b'a', F, blob1.id), (b'b', F, blob2.id))],
+ TreeChange(CHANGE_RENAME, (b'a', F, blob1.id),
+ (b'b', F, blob2.id))],
self.detect_renames(tree1, tree2, rewrite_threshold=80))
def test_find_copies_harder_exact(self):
self.assertEqual([TreeChange.add((b'b', F, blob2.id))],
self.detect_renames(tree1, tree2))
self.assertEqual(
- [TreeChange(CHANGE_COPY, (b'a', F, blob1.id), (b'b', F, blob2.id))],
+ [TreeChange(CHANGE_COPY, (b'a', F, blob1.id),
+ (b'b', F, blob2.id))],
self.detect_renames(tree1, tree2, find_copies_harder=True))
def test_find_copies_harder_with_rewrites(self):
self.assertEqual(
[TreeChange(CHANGE_MODIFY, (b'a', F, blob_a1.id),
(b'a', F, blob_a2.id)),
- TreeChange(CHANGE_COPY, (b'a', F, blob_a1.id), (b'b', F, blob_b2.id))],
+ TreeChange(CHANGE_COPY, (b'a', F, blob_a1.id),
+ (b'b', F, blob_b2.id))],
self.detect_renames(tree1, tree2, find_copies_harder=True))
self.assertEqual(
[TreeChange.add((b'a', F, blob_a2.id)),
b.data = b"fooBAR"
self.fastexporter.emit_blob(b)
self.assertEqual(b'blob\nmark :1\ndata 6\nfooBAR\n',
- self.stream.getvalue())
+ self.stream.getvalue())
def test_emit_commit(self):
b = Blob()
def test_commit_handler(self):
from fastimport import commands
- cmd = commands.CommitCommand(b"refs/heads/foo", b"mrkr",
- (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
- (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
- b"FOO", None, [], [])
+ cmd = commands.CommitCommand(
+ b"refs/heads/foo", b"mrkr",
+ (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
+ (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
+ b"FOO", None, [], [])
self.processor.commit_handler(cmd)
commit = self.repo[self.processor.last_commit]
self.assertEqual(b"Jelmer <jelmer@samba.org>", commit.author)
from fastimport import commands
cmd = commands.BlobCommand(b"23", b"data")
self.processor.blob_handler(cmd)
- cmd = commands.CommitCommand(b"refs/heads/foo", b"mrkr",
- (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
- (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
- b"FOO", None, [], [commands.FileModifyCommand(b"path", 0o100644, b":23", None)])
+ cmd = commands.CommitCommand(
+ b"refs/heads/foo", b"mrkr",
+ (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
+ (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
+ b"FOO", None, [],
+ [commands.FileModifyCommand(b"path", 0o100644, b":23", None)])
self.processor.commit_handler(cmd)
commit = self.repo[self.processor.last_commit]
self.assertEqual([
from fastimport import commands
cmd = commands.BlobCommand(b"23", b"data")
self.processor.blob_handler(cmd)
- cmd = commands.CommitCommand(b"refs/heads/foo", b"mrkr",
- (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
- (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
- b"FOO", None, [], [commands.FileModifyCommand(b"path", 0o100644, b":23", None)])
+ cmd = commands.CommitCommand(
+ b"refs/heads/foo", b"mrkr",
+ (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
+ (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
+ b"FOO", None, [],
+ [commands.FileModifyCommand(b"path", 0o100644, b":23", None)])
self.processor.commit_handler(cmd)
commit = self.repo[self.processor.last_commit]
return commit
:return: The created commit object
"""
from fastimport import commands
- cmd = commands.CommitCommand(b"refs/heads/foo", b"mrkr",
- (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
- (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
- b"FOO", None, [], file_cmds)
+ cmd = commands.CommitCommand(
+ b"refs/heads/foo", b"mrkr",
+ (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
+ (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
+ b"FOO", None, [], file_cmds)
self.processor.commit_handler(cmd)
return self.repo[self.processor.last_commit]
def test_file_copy(self):
from fastimport import commands
self.simple_commit()
- commit = self.make_file_commit([commands.FileCopyCommand(b"path", b"new_path")])
+ commit = self.make_file_commit(
+ [commands.FileCopyCommand(b"path", b"new_path")])
self.assertEqual([
- (b'new_path', 0o100644, b'6320cd248dd8aeaab759d5871f8781b5c0505172'),
- (b'path', 0o100644, b'6320cd248dd8aeaab759d5871f8781b5c0505172'),
- ], self.repo[commit.tree].items())
+ (b'new_path', 0o100644,
+ b'6320cd248dd8aeaab759d5871f8781b5c0505172'),
+ (b'path', 0o100644,
+ b'6320cd248dd8aeaab759d5871f8781b5c0505172'),
+ ], self.repo[commit.tree].items())
def test_file_move(self):
from fastimport import commands
self.simple_commit()
- commit = self.make_file_commit([commands.FileRenameCommand(b"path", b"new_path")])
+ commit = self.make_file_commit(
+ [commands.FileRenameCommand(b"path", b"new_path")])
self.assertEqual([
- (b'new_path', 0o100644, b'6320cd248dd8aeaab759d5871f8781b5c0505172'),
- ], self.repo[commit.tree].items())
+ (b'new_path', 0o100644,
+ b'6320cd248dd8aeaab759d5871f8781b5c0505172'),
+ ], self.repo[commit.tree].items())
def test_file_delete(self):
from fastimport import commands
)
try:
- import gevent
+ import gevent # noqa: F401
gevent_support = True
except ImportError:
gevent_support = False
skipmsg = "Gevent library is not installed"
+
def create_commit(marker=None):
blob = Blob.from_string(b'The blob content ' + marker)
tree = Tree()
def test_len(self):
wants = [sha.id for sha in self.objs if isinstance(sha, Commit)]
finder = MissingObjectFinder(self.store, (), wants)
- iterator = GreenThreadsObjectStoreIterator(self.store,
- iter(finder.next, None),
- finder)
+ iterator = GreenThreadsObjectStoreIterator(
+ self.store, iter(finder.next, None), finder)
# One commit refers one tree and one blob
self.assertEqual(len(iterator), self.cmt_amount * 3)
haves = wants[0:self.cmt_amount-1]
!negative
with trailing whitespace
with escaped trailing whitespace\
-""")
+""") # noqa: W291
self.assertEqual(list(read_ignore_patterns(f)), [
b'\\#not a comment',
b'!negative',
f.write('/blie\n')
with open(os.path.join(repo.path, 'dir', 'blie'), 'w') as f:
f.write('IGNORED')
- with open(os.path.join(repo.controldir(), 'info', 'exclude'), 'w') as f:
+ p = os.path.join(repo.controldir(), 'info', 'exclude')
+ with open(p, 'w') as f:
f.write('/excluded\n')
m = IgnoreFilterManager.from_repo(repo)
self.assertTrue(m.is_ignored(os.path.join(repo.path, 'dir', 'blie')))
- self.assertIs(None, m.is_ignored(os.path.join(repo.path, 'dir', 'bloe')))
+ self.assertIs(None,
+ m.is_ignored(os.path.join(repo.path, 'dir', 'bloe')))
self.assertIs(None, m.is_ignored(os.path.join(repo.path, 'dir')))
self.assertTrue(m.is_ignored(os.path.join(repo.path, 'foo', 'bar')))
self.assertTrue(m.is_ignored(os.path.join(repo.path, 'excluded')))
skipIf,
)
+
class IndexTestCase(TestCase):
datadir = os.path.join(os.path.dirname(__file__), 'data/indexes')
self.assertEqual([b'bla'], list(self.get_simple_index("index")))
def test_getitem(self):
- self.assertEqual(((1230680220, 0), (1230680220, 0), 2050, 3761020,
- 33188, 1000, 1000, 0,
- b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0),
- self.get_simple_index("index")[b"bla"])
+ self.assertEqual(
+ ((1230680220, 0), (1230680220, 0), 2050, 3761020,
+ 33188, 1000, 1000, 0,
+ b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0),
+ self.get_simple_index("index")[b"bla"])
def test_empty(self):
i = self.get_simple_index("notanindex")
self.assertEqual(b'bla', newname)
self.assertEqual(b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', newsha)
+
class SimpleIndexWriterTestCase(IndexTestCase):
def setUp(self):
shutil.rmtree(self.tempdir)
def test_simple_write(self):
- entries = {b'barbla': ((1230680220, 0), (1230680220, 0), 2050, 3761020,
- 33188, 1000, 1000, 0,
- b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)}
+ entries = {
+ b'barbla':
+ ((1230680220, 0), (1230680220, 0), 2050, 3761020, 33188,
+ 1000, 1000, 0,
+ b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)}
filename = os.path.join(self.tempdir, 'test-simple-write-index')
with open(filename, 'wb+') as x:
write_index_dict(x, entries)
self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
self.assertEqual(set([rootid, dirid, blob.id]),
- set(self.store._data.keys()))
+ set(self.store._data.keys()))
class CleanupModeTests(TestCase):
class IndexEntryFromStatTests(TestCase):
def test_simple(self):
- st = os.stat_result((16877, 131078, 64769,
- 154, 1000, 1000, 12288,
- 1323629595, 1324180496, 1324180496))
+ st = os.stat_result(
+ (16877, 131078, 64769, 154, 1000, 1000, 12288,
+ 1323629595, 1324180496, 1324180496))
entry = index_entry_from_stat(st, "22" * 20, 0)
self.assertEqual(entry, (
1324180496,
0))
def test_override_mode(self):
- st = os.stat_result((stat.S_IFREG + 0o644, 131078, 64769,
- 154, 1000, 1000, 12288,
- 1323629595, 1324180496, 1324180496))
+ st = os.stat_result(
+ (stat.S_IFREG + 0o644, 131078, 64769,
+ 154, 1000, 1000, 12288,
+ 1323629595, 1324180496, 1324180496))
entry = index_entry_from_stat(
st, "22" * 20, 0, mode=stat.S_IFREG + 0o755)
self.assertEqual(entry, (
tree = Tree()
repo.object_store.add_object(tree)
- build_index_from_tree(repo.path, repo.index_path(),
+ build_index_from_tree(
+ repo.path, repo.index_path(),
repo.object_store, tree.id)
# Verify index entries
tree[b'.git/a'] = (stat.S_IFREG | 0o644, filea.id)
tree[b'c/e'] = (stat.S_IFREG | 0o644, filee.id)
- repo.object_store.add_objects([(o, None)
- for o in [filea, filee, tree]])
+ repo.object_store.add_objects(
+ [(o, None) for o in [filea, filee, tree]])
build_index_from_tree(
repo.path, repo.index_path(), repo.object_store, tree.id)
# filea
apath = os.path.join(repo.path, 'a')
self.assertTrue(os.path.exists(apath))
- self.assertReasonableIndexEntry(index[b'a'],
- stat.S_IFREG | 0o644, 6, filea.id)
+ self.assertReasonableIndexEntry(
+ index[b'a'], stat.S_IFREG | 0o644, 6, filea.id)
self.assertFileContents(apath, b'file a')
# fileb
bpath = os.path.join(repo.path, 'b')
self.assertTrue(os.path.exists(bpath))
- self.assertReasonableIndexEntry(index[b'b'],
- stat.S_IFREG | 0o644, 6, fileb.id)
+ self.assertReasonableIndexEntry(
+ index[b'b'], stat.S_IFREG | 0o644, 6, fileb.id)
self.assertFileContents(bpath, b'file b')
# filed
dpath = os.path.join(repo.path, 'c', 'd')
self.assertTrue(os.path.exists(dpath))
- self.assertReasonableIndexEntry(index[b'c/d'],
- stat.S_IFREG | 0o644, 6, filed.id)
+ self.assertReasonableIndexEntry(
+ index[b'c/d'], stat.S_IFREG | 0o644, 6, filed.id)
self.assertFileContents(dpath, b'file d')
# Verify no extra files
- self.assertEqual(['.git', 'a', 'b', 'c'],
- sorted(os.listdir(repo.path)))
- self.assertEqual(['d'],
- sorted(os.listdir(os.path.join(repo.path, 'c'))))
+ self.assertEqual(
+ ['.git', 'a', 'b', 'c'], sorted(os.listdir(repo.path)))
+ self.assertEqual(
+ ['d'], sorted(os.listdir(os.path.join(repo.path, 'c'))))
@skipIf(not getattr(os, 'sync', None), 'Requires sync support')
def test_norewrite(self):
tree = Tree()
tree[b'a'] = (stat.S_IFREG | 0o644, filea.id)
- repo.object_store.add_objects([(o, None)
- for o in [filea, tree]])
+ repo.object_store.add_objects([(o, None) for o in [filea, tree]])
# First Write
build_index_from_tree(repo.path, repo.index_path(),
with open(filea_path, 'rb') as fh:
self.assertEqual(b'file a', fh.read())
-
@skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
def test_symlink(self):
repo_dir = tempfile.mkdtemp()
tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id)
tree[b'c/e'] = (stat.S_IFLNK, filee.id) # symlink
- repo.object_store.add_objects([(o, None)
- for o in [filed, filee, tree]])
+ repo.object_store.add_objects(
+ [(o, None) for o in [filed, filee, tree]])
- build_index_from_tree(repo.path, repo.index_path(),
- repo.object_store, tree.id)
+ build_index_from_tree(
+ repo.path, repo.index_path(), repo.object_store, tree.id)
# Verify index entries
index = repo.open_index()
repo.object_store.add_objects(
[(o, None) for o in [tree]])
- build_index_from_tree(repo.path, repo.index_path(),
- repo.object_store, tree.id)
+ build_index_from_tree(
+ repo.path, repo.index_path(), repo.object_store, tree.id)
# Verify index entries
index = repo.open_index()
repo.object_store.add_objects(
[(o, None) for o in [tree]])
- build_index_from_tree(repo.path, repo.index_path(),
- repo.object_store, tree.id)
+ build_index_from_tree(
+ repo.path, repo.index_path(), repo.object_store, tree.id)
# Verify index entries
index = repo.open_index()
TestCase,
)
+
class TestLRUCache(TestCase):
"""Test that LRU cache properly keeps track of entries."""
def test_cleanup(self):
"""Test that we can use a cleanup function."""
cleanup_called = []
+
def cleanup_func(key, val):
cleanup_called.append((key, val))
def test_cleanup_on_replace(self):
"""Replacing an object should cleanup the old value."""
cleanup_called = []
+
def cleanup_func(key, val):
cleanup_called.append((key, val))
self.assertEqual(8, len(cache))
- cache[1] = 15 # replacement
+ cache[1] = 15 # replacement
self.assertEqual(8, len(cache))
cache[9] = 10
cache[10] = 11
self.assertEqual([3, 4, 5, 6, 7, 8, 9, 10], sorted(cache.keys()))
- cache[11] = 12 # triggers cleanup back to new after_cleanup_count
+ cache[11] = 12 # triggers cleanup back to new after_cleanup_count
self.assertEqual([6, 7, 8, 9, 10, 11], sorted(cache.keys()))
self.assertEqual({'test': 'key'}, cache.items())
cache.add('test2', 'key that is too big')
self.assertEqual(3, cache._value_size)
- self.assertEqual({'test':'key'}, cache.items())
+ self.assertEqual({'test': 'key'}, cache.items())
# If we would add a key, only to cleanup and remove all cached entries,
# then obviously that value should not be stored
cache.add('test3', 'bigkey')
self.assertEqual(3, cache._value_size)
- self.assertEqual({'test':'key'}, cache.items())
+ self.assertEqual({'test': 'key'}, cache.items())
cache.add('test4', 'bikey')
self.assertEqual(3, cache._value_size)
- self.assertEqual({'test':'key'}, cache.items())
+ self.assertEqual({'test': 'key'}, cache.items())
def test_no_add_over_size_cleanup(self):
"""If a large value is not cached, we will call cleanup right away."""
cleanup_calls = []
+
def cleanup(key, value):
cleanup_calls.append((key, value))
def test_adding_clears_cache_based_on_size(self):
"""The cache is cleared in LRU order until small enough"""
cache = lru_cache.LRUSizeCache(max_size=20)
- cache.add('key1', 'value') # 5 chars
- cache.add('key2', 'value2') # 6 chars
- cache.add('key3', 'value23') # 7 chars
+ cache.add('key1', 'value') # 5 chars
+ cache.add('key2', 'value2') # 6 chars
+ cache.add('key3', 'value23') # 7 chars
self.assertEqual(5+6+7, cache._value_size)
- cache['key2'] # reference key2 so it gets a newer reference time
- cache.add('key4', 'value234') # 8 chars, over limit
+ cache['key2'] # reference key2 so it gets a newer reference time
+ cache.add('key4', 'value234') # 8 chars, over limit
# We have to remove 2 keys to get back under limit
self.assertEqual(6+8, cache._value_size)
- self.assertEqual({'key2':'value2', 'key4':'value234'},
+ self.assertEqual({'key2': 'value2', 'key4': 'value234'},
cache.items())
def test_adding_clears_to_after_cleanup_size(self):
cache = lru_cache.LRUSizeCache(max_size=20, after_cleanup_size=10)
- cache.add('key1', 'value') # 5 chars
- cache.add('key2', 'value2') # 6 chars
- cache.add('key3', 'value23') # 7 chars
+ cache.add('key1', 'value') # 5 chars
+ cache.add('key2', 'value2') # 6 chars
+ cache.add('key3', 'value23') # 7 chars
self.assertEqual(5+6+7, cache._value_size)
- cache['key2'] # reference key2 so it gets a newer reference time
- cache.add('key4', 'value234') # 8 chars, over limit
+ cache['key2'] # reference key2 so it gets a newer reference time
+ cache.add('key4', 'value234') # 8 chars, over limit
# We have to remove 3 keys to get back under limit
self.assertEqual(8, cache._value_size)
- self.assertEqual({'key4':'value234'}, cache.items())
+ self.assertEqual({'key4': 'value234'}, cache.items())
def test_custom_sizes(self):
def size_of_list(lst):
cache = lru_cache.LRUSizeCache(max_size=20, after_cleanup_size=10,
compute_size=size_of_list)
- cache.add('key1', ['val', 'ue']) # 5 chars
- cache.add('key2', ['val', 'ue2']) # 6 chars
- cache.add('key3', ['val', 'ue23']) # 7 chars
+ cache.add('key1', ['val', 'ue']) # 5 chars
+ cache.add('key2', ['val', 'ue2']) # 6 chars
+ cache.add('key3', ['val', 'ue23']) # 7 chars
self.assertEqual(5+6+7, cache._value_size)
- cache['key2'] # reference key2 so it gets a newer reference time
- cache.add('key4', ['value', '234']) # 8 chars, over limit
+ cache['key2'] # reference key2 so it gets a newer reference time
+ cache.add('key4', ['value', '234']) # 8 chars, over limit
# We have to remove 3 keys to get back under limit
self.assertEqual(8, cache._value_size)
- self.assertEqual({'key4':['value', '234']}, cache.items())
+ self.assertEqual({'key4': ['value', '234']}, cache.items())
def test_cleanup(self):
cache = lru_cache.LRUSizeCache(max_size=20, after_cleanup_size=10)
# Add these in order
- cache.add('key1', 'value') # 5 chars
- cache.add('key2', 'value2') # 6 chars
- cache.add('key3', 'value23') # 7 chars
+ cache.add('key1', 'value') # 5 chars
+ cache.add('key2', 'value2') # 6 chars
+ cache.add('key3', 'value23') # 7 chars
self.assertEqual(5+6+7, cache._value_size)
cache.cleanup()
self.assertEqual([2, 3, 4, 5, 6], sorted(cache.keys()))
cache[7] = 'stu'
self.assertEqual([4, 5, 6, 7], sorted(cache.keys()))
-
def assertMissingMatch(self, haves, wants, expected):
for sha, path in self.store.find_missing_objects(haves, wants):
- self.assertTrue(sha in expected,
- "(%s,%s) erroneously reported as missing" % (sha, path))
+ self.assertTrue(
+ sha in expected,
+ "(%s,%s) erroneously reported as missing" % (sha, path))
expected.remove(sha)
- self.assertEqual(len(expected), 0,
- "some objects are not reported as missing: %s" % (expected, ))
+ self.assertEqual(
+ len(expected), 0,
+ "some objects are not reported as missing: %s" % (expected, ))
class MOFLinearRepoTest(MissingObjectFinderTest):
def setUp(self):
super(MOFLinearRepoTest, self).setUp()
- f1_1 = make_object(Blob, data=b'f1') # present in 1, removed in 3
- f2_1 = make_object(Blob, data=b'f2') # present in all revisions, changed in 2 and 3
+ # present in 1, removed in 3
+ f1_1 = make_object(Blob, data=b'f1')
+ # present in all revisions, changed in 2 and 3
+ f2_1 = make_object(Blob, data=b'f2')
f2_2 = make_object(Blob, data=b'f2-changed')
f2_3 = make_object(Blob, data=b'f2-changed-again')
- f3_2 = make_object(Blob, data=b'f3') # added in 2, left unmodified in 3
+ # added in 2, left unmodified in 3
+ f3_2 = make_object(Blob, data=b'f3')
commit_spec = [[1], [2, 1], [3, 2]]
trees = {1: [(b'f1', f1_1), (b'f2', f2_1)],
2: [(b'f1', f1_1), (b'f2', f2_2), (b'f3', f3_2)],
- 3: [(b'f2', f2_3), (b'f3', f3_2)] }
+ 3: [(b'f2', f2_3), (b'f3', f3_2)]}
# commit 1: f1 and f2
# commit 2: f3 added, f2 changed. Missing shall report commit id and a
# tree referenced by commit
f2_2.id, f3_2.id, f2_3.id]
def test_1_to_2(self):
- self.assertMissingMatch([self.cmt(1).id], [self.cmt(2).id],
- self.missing_1_2)
+ self.assertMissingMatch(
+ [self.cmt(1).id], [self.cmt(2).id],
+ self.missing_1_2)
def test_2_to_3(self):
- self.assertMissingMatch([self.cmt(2).id], [self.cmt(3).id],
- self.missing_2_3)
+ self.assertMissingMatch(
+ [self.cmt(2).id], [self.cmt(3).id],
+ self.missing_2_3)
def test_1_to_3(self):
- self.assertMissingMatch([self.cmt(1).id], [self.cmt(3).id],
- self.missing_1_3)
+ self.assertMissingMatch(
+ [self.cmt(1).id], [self.cmt(3).id],
+ self.missing_1_3)
def test_bogus_haves(self):
"""Ensure non-existent SHA in haves are tolerated"""
bogus_sha = self.cmt(2).id[::-1]
haves = [self.cmt(1).id]
wants = [self.cmt(3).id, bogus_sha]
- self.assertRaises(KeyError, self.store.find_missing_objects,
- haves, wants)
+ self.assertRaises(
+ KeyError, self.store.find_missing_objects, haves, wants)
def test_no_changes(self):
self.assertMissingMatch([self.cmt(3).id], [self.cmt(3).id], [])
f1_1 = make_object(Blob, data=b'f1')
f1_2 = make_object(Blob, data=b'f1-2')
f1_4 = make_object(Blob, data=b'f1-4')
- f1_7 = make_object(Blob, data=b'f1-2') # same data as in rev 2
+ f1_7 = make_object(Blob, data=b'f1-2') # same data as in rev 2
f2_1 = make_object(Blob, data=b'f2')
f2_3 = make_object(Blob, data=b'f2-3')
f3_3 = make_object(Blob, data=b'f3')
f3_5 = make_object(Blob, data=b'f3-5')
commit_spec = [[1], [2, 1], [3, 2], [4, 2], [5, 3], [6, 3, 4], [7, 6]]
trees = {1: [(b'f1', f1_1), (b'f2', f2_1)],
- 2: [(b'f1', f1_2), (b'f2', f2_1)], # f1 changed
- # f3 added, f2 changed
- 3: [(b'f1', f1_2), (b'f2', f2_3), (b'f3', f3_3)],
- 4: [(b'f1', f1_4), (b'f2', f2_1)], # f1 changed
- 5: [(b'f1', f1_2), (b'f3', f3_5)], # f2 removed, f3 changed
- 6: [(b'f1', f1_4), (b'f2', f2_3), (b'f3', f3_3)], # merged 3 and 4
- # f1 changed to match rev2. f3 removed
- 7: [(b'f1', f1_7), (b'f2', f2_3)]}
+ 2: [(b'f1', f1_2), (b'f2', f2_1)], # f1 changed
+ # f3 added, f2 changed
+ 3: [(b'f1', f1_2), (b'f2', f2_3), (b'f3', f3_3)],
+ 4: [(b'f1', f1_4), (b'f2', f2_1)], # f1 changed
+ 5: [(b'f1', f1_2), (b'f3', f3_5)], # f2 removed, f3 changed
+ # merged 3 and 4
+ 6: [(b'f1', f1_4), (b'f2', f2_3), (b'f3', f3_3)],
+ # f1 changed to match rev2. f3 removed
+ 7: [(b'f1', f1_7), (b'f2', f2_3)]}
self.commits = build_commit_graph(self.store, commit_spec, trees)
self.f1_2_id = f1_2.id
# which is an overkill (i.e. in sha_done it records f1_4 as known, and
# doesn't record f1_2 was known prior to that, hence can't detect f1_7
# is in fact f1_2 and shall not be reported)
- self.assertMissingMatch([self.cmt(6).id], [self.cmt(7).id],
- [self.cmt(7).id, self.cmt(7).tree, self.f1_7_id])
+ self.assertMissingMatch(
+ [self.cmt(6).id], [self.cmt(7).id],
+ [self.cmt(7).id, self.cmt(7).tree, self.f1_7_id])
def test_have4_want7(self):
# have 4, want 7. Shall not include rev5 as it is not in the tree
tree1_id = commit_tree(self.store, blobs_1)
blobs_2 = [(b'a', blob_a2.id, 0o100644), (b'b', blob_b.id, 0o100644)]
tree2_id = commit_tree(self.store, blobs_2)
- change_a = ((b'a', b'a'), (0o100644, 0o100644), (blob_a1.id, blob_a2.id))
+ change_a = ((b'a', b'a'), (0o100644, 0o100644),
+ (blob_a1.id, blob_a2.id))
self.assertEqual([change_a],
list(self.store.tree_changes(tree1_id, tree2_id)))
self.assertEqual(
- [change_a, ((b'b', b'b'), (0o100644, 0o100644), (blob_b.id, blob_b.id))],
- list(self.store.tree_changes(tree1_id, tree2_id, want_unchanged=True)))
+ [change_a, ((b'b', b'b'), (0o100644, 0o100644),
+ (blob_b.id, blob_b.id))],
+ list(self.store.tree_changes(tree1_id, tree2_id,
+ want_unchanged=True)))
def test_iter_tree_contents(self):
blob_a = make_object(Blob, data=b'a')
class ObjectStoreGraphWalkerTests(TestCase):
def get_walker(self, heads, parent_map):
- new_parent_map = dict([
- (k * 40, [(p * 40) for p in ps]) for (k, ps) in parent_map.items()])
+ new_parent_map = dict(
+ [(k * 40, [(p * 40) for p in ps])
+ for (k, ps) in parent_map.items()])
return ObjectStoreGraphWalker([x * 40 for x in heads],
- new_parent_map.__getitem__)
+ new_parent_map.__getitem__)
def test_ack_invalid_value(self):
gw = self.get_walker([], {})
# A branch (a, c) or (b, d) may be done after 2 steps or 3 depending on
# the order walked: 3-step walks include (a, b, c) and (b, a, d), etc.
if walk == [b"a" * 40, b"c" * 40] or walk == [b"b" * 40, b"d" * 40]:
- gw.ack(walk[0])
- acked = True
+ gw.ack(walk[0])
+ acked = True
walk.append(next(gw))
if not acked and walk[2] == b"c" * 40:
- gw.ack(b"a" * 40)
+ gw.ack(b"a" * 40)
elif not acked and walk[2] == b"d" * 40:
- gw.ack(b"b" * 40)
+ gw.ack(b"b" * 40)
walk.append(next(gw))
self.assertIs(None, next(gw))
- self.assertEqual([b"a" * 40, b"b" * 40, b"c" * 40, b"d" * 40], sorted(walk))
+ self.assertEqual([b"a" * 40, b"b" * 40, b"c" * 40, b"d" * 40],
+ sorted(walk))
self.assertLess(walk.index(b"a" * 40), walk.index(b"c" * 40))
self.assertLess(walk.index(b"b" * 40), walk.index(b"d" * 40))
def test_splitlines(self):
for case in [
- [],
- [b'foo\nbar\n'],
- [b'bl\na', b'blie'],
- [b'bl\na', b'blie', b'bloe\n'],
- [b'', b'bl\na', b'blie', b'bloe\n'],
- [b'', b'', b'', b'bla\n'],
- [b'', b'', b'', b'bla\n', b''],
- [b'bl', b'', b'a\naaa'],
- [b'a\naaa', b'a'],
- ]:
+ [],
+ [b'foo\nbar\n'],
+ [b'bl\na', b'blie'],
+ [b'bl\na', b'blie', b'bloe\n'],
+ [b'', b'bl\na', b'blie', b'bloe\n'],
+ [b'', b'', b'', b'bla\n'],
+ [b'', b'', b'', b'bla\n', b''],
+ [b'bl', b'', b'a\naaa'],
+ [b'a\naaa', b'a'],
+ ]:
b = Blob()
b.chunked = case
self.assertEqual(b.data.splitlines(True), b.splitlines())
def test_read_tree_from_file_parse_count(self):
old_deserialize = Tree._deserialize
+
def reset_deserialize():
Tree._deserialize = old_deserialize
self.addCleanup(reset_deserialize)
self.deserialize_count = 0
+
def counting_deserialize(*args, **kwargs):
self.deserialize_count += 1
return old_deserialize(*args, **kwargs)
self.assertEqual(t.name, b'signed')
self.assertEqual(t.tagger, b'Ali Sabil <ali.sabil@gmail.com>')
self.assertEqual(t.tag_time, 1231203091)
- self.assertEqual(t.message, b'This is a signed tag\n-----BEGIN PGP SIGNATURE-----\nVersion: GnuPG v1.4.9 (GNU/Linux)\n\niEYEABECAAYFAkliqx8ACgkQqSMmLy9u/kcx5ACfakZ9NnPl02tOyYP6pkBoEkU1\n5EcAn0UFgokaSvS371Ym/4W9iJj6vh3h\n=ql7y\n-----END PGP SIGNATURE-----\n')
+ self.assertEqual(
+ t.message,
+ b'This is a signed tag\n'
+ b'-----BEGIN PGP SIGNATURE-----\n'
+ b'Version: GnuPG v1.4.9 (GNU/Linux)\n'
+ b'\n'
+ b'iEYEABECAAYFAkliqx8ACgkQqSMmLy9u/'
+ b'kcx5ACfakZ9NnPl02tOyYP6pkBoEkU1\n'
+ b'5EcAn0UFgokaSvS371Ym/4W9iJj6vh3h\n'
+ b'=ql7y\n'
+ b'-----END PGP SIGNATURE-----\n')
def test_read_commit_from_file(self):
sha = b'60dacdc733de308bb77bb76ce0fb0f9b44c9769e'
def assertCheckFails(self, cls, data):
obj = cls()
+
def do_check():
obj.set_raw_string(data)
obj.check()
-----END PGP SIGNATURE-----
Merge ../b
-""", commit.as_raw_string())
+""", commit.as_raw_string()) # noqa: W291,W293
def test_serialize_mergetag(self):
tag = make_object(
-----END PGP SIGNATURE-----
Merge ../b
-""", commit.as_raw_string())
+""", commit.as_raw_string()) # noqa: W291,W293
def test_serialize_mergetags(self):
tag = make_object(
-----END PGP SIGNATURE-----
Merge ../b
-""", commit.as_raw_string())
+""", commit.as_raw_string()) # noqa: W291,W293
def test_deserialize_mergetag(self):
tag = make_object(
self.assertEqual(commit, d)
-default_committer = b'James Westby <jw+debian@jameswestby.net> 1174773719 +0000'
+default_committer = (
+ b'James Westby <jw+debian@jameswestby.net> 1174773719 +0000')
+
class CommitParseTests(ShaFileCheckTests):
def make_commit_lines(self,
tree=b'd80c186a03f423a81b39df39dc87fd269736ca86',
- parents=[b'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd',
- b'4cffe90e0a41ad3f5190079d7c8f036bde29cbe6'],
+ parents=[
+ b'ab64bbdcc51b170d21588e5c5d391ee5c0c96dfd',
+ b'4cffe90e0a41ad3f5190079d7c8f036bde29cbe6'],
author=default_committer,
committer=default_committer,
encoding=None,
c.parents)
expected_time = datetime.datetime(2007, 3, 24, 22, 1, 59)
self.assertEqual(expected_time,
- datetime.datetime.utcfromtimestamp(c.commit_time))
+ datetime.datetime.utcfromtimestamp(c.commit_time))
self.assertEqual(0, c.commit_timezone)
self.assertEqual(expected_time,
- datetime.datetime.utcfromtimestamp(c.author_time))
+ datetime.datetime.utcfromtimestamp(c.author_time))
self.assertEqual(0, c.author_timezone)
self.assertEqual(None, c.encoding)
-----END PGP SIGNATURE-----
foo
-""")
+""") # noqa: W291,W293
self.assertEqual(b'foo\n', c.message)
self.assertEqual([], c.extra)
self.assertEqual(b"""-----BEGIN PGP SIGNATURE-----
3.3.0 version bump and docs
-''')
+''') # noqa: W291,W293
self.assertEqual([], c.extra)
self.assertEqual(b'''\
-----BEGIN PGP SIGNATURE-----
_SORTED_TREE_ITEMS = [
TreeEntry(b'a.c', 0o100755, b'd80c186a03f423a81b39df39dc87fd269736ca86'),
TreeEntry(b'a', stat.S_IFDIR, b'd80c186a03f423a81b39df39dc87fd269736ca86'),
- TreeEntry(b'a/c', stat.S_IFDIR, b'd80c186a03f423a81b39df39dc87fd269736ca86'),
+ TreeEntry(b'a/c', stat.S_IFDIR,
+ b'd80c186a03f423a81b39df39dc87fd269736ca86'),
]
x = Tree()
x.add(b'myname', 0o100755, myhexsha)
self.assertEqual(x[b'myname'], (0o100755, myhexsha))
- self.assertEqual(b'100755 myname\0' + hex_to_sha(myhexsha),
+ self.assertEqual(
+ b'100755 myname\0' + hex_to_sha(myhexsha),
x.as_raw_string())
def test_add_old_order(self):
# C/Python implementations may differ in specific error types, but
# should all error on invalid inputs.
# For example, the C implementation has stricter type checks, so may
- # raise TypeError where the Python implementation raises AttributeError.
+ # raise TypeError where the Python implementation raises
+ # AttributeError.
errors = (TypeError, ValueError, AttributeError)
self.assertRaises(errors, do_sort, b'foo')
self.assertRaises(errors, do_sort, {b'foo': (1, 2, 3)})
# shas
self.assertCheckFails(t, b'100644 a\0' + (b'x' * 5))
self.assertCheckFails(t, b'100644 a\0' + (b'x' * 18) + b'\0')
- self.assertCheckFails(t, b'100644 a\0' + (b'x' * 21) + b'\n100644 b\0' + sha)
+ self.assertCheckFails(
+ t, b'100644 a\0' + (b'x' * 21) + b'\n100644 b\0' + sha)
# ordering
sha2 = hex_to_sha(b_sha)
- self.assertCheckSucceeds(t, b'100644 a\0' + sha + b'\n100644 b\0' + sha)
- self.assertCheckSucceeds(t, b'100644 a\0' + sha + b'\n100644 b\0' + sha2)
+ self.assertCheckSucceeds(
+ t, b'100644 a\0' + sha + b'\n100644 b\0' + sha)
+ self.assertCheckSucceeds(
+ t, b'100644 a\0' + sha + b'\n100644 b\0' + sha2)
self.assertCheckFails(t, b'100644 a\0' + sha + b'\n100755 a\0' + sha2)
self.assertCheckFails(t, b'100644 b\0' + sha2 + b'\n100644 a\0' + sha)
def test_commit_by_sha(self):
r = MemoryRepo()
- c1, c2, c3 = build_commit_graph(r.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ r.object_store, [[1], [2, 1], [3, 1, 2]])
self.assertEqual([c1], list(parse_commit_range(r, c1.id)))
def test_head(self):
r = {b"refs/heads/foo": "bla"}
self.assertEqual((b"refs/heads/foo", b"refs/heads/foo", False),
- parse_reftuple(r, r, b"foo"))
+ parse_reftuple(r, r, b"foo"))
self.assertEqual((b"refs/heads/foo", b"refs/heads/foo", True),
- parse_reftuple(r, r, b"+foo"))
+ parse_reftuple(r, r, b"+foo"))
self.assertEqual((b"refs/heads/foo", b"refs/heads/foo", True),
- parse_reftuple(r, {}, b"+foo"))
+ parse_reftuple(r, {}, b"+foo"))
def test_full(self):
r = {b"refs/heads/foo": "bla"}
self.assertEqual((b"refs/heads/foo", b"refs/heads/foo", False),
- parse_reftuple(r, r, b"refs/heads/foo"))
+ parse_reftuple(r, r, b"refs/heads/foo"))
def test_no_left_ref(self):
r = {b"refs/heads/foo": "bla"}
self.assertEqual((None, b"refs/heads/foo", False),
- parse_reftuple(r, r, b":refs/heads/foo"))
+ parse_reftuple(r, r, b":refs/heads/foo"))
def test_no_right_ref(self):
r = {b"refs/heads/foo": "bla"}
self.assertEqual((b"refs/heads/foo", None, False),
- parse_reftuple(r, r, b"refs/heads/foo:"))
+ parse_reftuple(r, r, b"refs/heads/foo:"))
class ParseReftuplesTests(TestCase):
def test_nonexistent(self):
r = {}
self.assertRaises(KeyError, parse_reftuples, r, r,
- [b"thisdoesnotexist"])
+ [b"thisdoesnotexist"])
def test_head(self):
r = {b"refs/heads/foo": "bla"}
self.assertEqual([(b"refs/heads/foo", b"refs/heads/foo", False)],
- parse_reftuples(r, r, [b"foo"]))
+ parse_reftuples(r, r, [b"foo"]))
def test_full(self):
r = {b"refs/heads/foo": "bla"}
self.assertEqual([(b"refs/heads/foo", b"refs/heads/foo", False)],
- parse_reftuples(r, r, b"refs/heads/foo"))
+ parse_reftuples(r, r, b"refs/heads/foo"))
self.tempdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tempdir)
- datadir = os.path.abspath(os.path.join(os.path.dirname(__file__),
- 'data/packs'))
+ datadir = os.path.abspath(
+ os.path.join(os.path.dirname(__file__), 'data/packs'))
def get_pack_index(self, sha):
"""Returns a PackIndex from the datadir with the given sha"""
- return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha.decode('ascii')))
+ return load_pack_index(
+ os.path.join(self.datadir,
+ 'pack-%s.idx' % sha.decode('ascii')))
def get_pack_data(self, sha):
"""Returns a PackData object from the datadir with the given sha"""
- return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha.decode('ascii')))
+ return PackData(
+ os.path.join(
+ self.datadir, 'pack-%s.pack' % sha.decode('ascii')))
def get_pack(self, sha):
- return Pack(os.path.join(self.datadir, 'pack-%s' % sha.decode('ascii')))
+ return Pack(
+ os.path.join(self.datadir, 'pack-%s' % sha.decode('ascii')))
def assertSucceeds(self, func, *args, **kwargs):
try:
test_string_huge = b'Z' * 100000
def _test_roundtrip(self, base, target):
- self.assertEqual(target,
- b''.join(apply_delta(base, create_delta(base, target))))
+ self.assertEqual(
+ target,
+ b''.join(apply_delta(base, create_delta(base, target))))
def test_nochange(self):
self._test_roundtrip(self.test_string1, self.test_string1)
self.test_string_huge + self.test_string2)
def test_dest_overflow(self):
- self.assertRaises(
- ApplyDeltaError,
- apply_delta, b'a'*0x10000, b'\x80\x80\x04\x80\x80\x04\x80' + b'a'*0x10000)
+ self.assertRaises(ApplyDeltaError, apply_delta,
+ b'a'*0x10000, b'\x80\x80\x04\x80\x80\x04\x80' +
+ b'a'*0x10000)
self.assertRaises(
ApplyDeltaError,
apply_delta, b'', b'\x00\x80\x02\xb0\x11\x11')
chunks = [
b'tree 03207ccf58880a748188836155ceed72f03d65d6\n'
b'parent 408fbab530fd4abe49249a636a10f10f44d07a21\n'
- b'author Victor Stinner <victor.stinner@gmail.com> 1421355207 +0100\n'
- b'committer Victor Stinner <victor.stinner@gmail.com> 1421355207 +0100\n'
+ b'author Victor Stinner <victor.stinner@gmail.com> '
+ b'1421355207 +0100\n'
+ b'committer Victor Stinner <victor.stinner@gmail.com> '
+ b'1421355207 +0100\n'
b'\n'
b'Backout changeset 3a06020af8cf\n'
- b'\nStreamWriter: close() now clears the reference to the transport\n'
- b'\nStreamWriter now raises an exception if it is closed: write(), writelines(),\n'
+ b'\nStreamWriter: close() now clears the reference to the '
+ b'transport\n'
+ b'\nStreamWriter now raises an exception if it is closed: '
+ b'write(), writelines(),\n'
b'write_eof(), can_write_eof(), get_extra_info(), drain().\n']
delta = [
b'\xcd\x03\xad\x03]tree ff3c181a393d5a7270cddc01ea863818a8621ca8\n'
b'parent 20a103cc90135494162e819f98d0edfc1f1fba6b',
b'\nauthor Victor Stinner <victor.stinner@gmail.com> 14213',
b'10738',
- b' +0100\ncommitter Victor Stinner <victor.stinner@gmail.com> 14213',
+ b' +0100\ncommitter Victor Stinner <victor.stinner@gmail.com> '
+ b'14213',
b'10738 +0100',
- b'\n\nStreamWriter: close() now clears the reference to the transport\n\n'
- b'StreamWriter now raises an exception if it is closed: write(), writelines(),\n'
+ b'\n\nStreamWriter: close() now clears the reference to the '
+ b'transport\n\n'
+ b'StreamWriter now raises an exception if it is closed: '
+ b'write(), writelines(),\n'
b'write_eof(), can_write_eof(), get_extra_info(), drain().\n']
self.assertEqual(b''.join(expected), b''.join(res))
self.get_pack_data(pack1_sha).close()
def test_from_file(self):
- path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha.decode('ascii'))
+ path = os.path.join(self.datadir,
+ 'pack-%s.pack' % pack1_sha.decode('ascii'))
with open(path, 'rb') as f:
PackData.from_file(f, os.path.getsize(path))
def test_iterobjects(self):
with self.get_pack_data(pack1_sha) as p:
- commit_data = (b'tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
- b'author James Westby <jw+debian@jameswestby.net> '
- b'1174945067 +0100\n'
- b'committer James Westby <jw+debian@jameswestby.net> '
- b'1174945067 +0100\n'
- b'\n'
- b'Test commit\n')
+ commit_data = (
+ b'tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
+ b'author James Westby <jw+debian@jameswestby.net> '
+ b'1174945067 +0100\n'
+ b'committer James Westby <jw+debian@jameswestby.net> '
+ b'1174945067 +0100\n'
+ b'\n'
+ b'Test commit\n')
blob_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8'
tree_data = b'100644 a\0' + hex_to_sha(blob_sha)
actual = []
self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20)
self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20)
self.assertRaises(AssertionError, compute_file_sha, f, start_ofs=10,
- end_ofs=-12)
+ end_ofs=-12)
class TestPack(PackTests):
def test_pack_tuples(self):
with self.get_pack(pack1_sha) as p:
tuples = p.pack_tuples()
- expected = set([(p[s], None) for s in [commit_sha, tree_sha, a_sha]])
+ expected = set(
+ [(p[s], None) for s in [commit_sha, tree_sha, a_sha]])
self.assertEqual(expected, set(list(tuples)))
self.assertEqual(expected, set(list(tuples)))
self.assertEqual(3, len(tuples))
self.assertSucceeds(newpack.index.check)
self.assertEqual(origpack.name(), newpack.name())
self.assertEqual(origpack.index.get_pack_checksum(),
- newpack.index.get_pack_checksum())
+ newpack.index.get_pack_checksum())
wrong_version = origpack.index.version != newpack.index.version
orig_checksum = origpack.index.get_stored_checksum()
entry1_sha = hex_to_sha('4e6388232ec39792661e2e75db8fb117fc869ce6')
entry2_sha = hex_to_sha('e98f071751bd77f59967bfa671cd2caebdccc9a2')
entries = [(entry1_sha, 0xf2972d0830529b87, 24),
- (entry2_sha, (~0xf2972d0830529b87)&(2**64-1), 92)]
+ (entry2_sha, (~0xf2972d0830529b87) & (2 ** 64 - 1), 92)]
if not self._supports_large:
self.assertRaises(TypeError, self.index, 'single.idx',
- entries, pack_checksum)
+ entries, pack_checksum)
return
idx = self.index('single.idx', entries, pack_checksum)
self.assertEqual(idx.get_pack_checksum(), pack_checksum)
def setUp(self):
super(ReadZlibTests, self).setUp()
self.read = BytesIO(self.comp + self.extra).read
- self.unpacked = UnpackedObject(Tree.type_num, None, len(self.decomp), 0)
+ self.unpacked = UnpackedObject(
+ Tree.type_num, None, len(self.decomp), 0)
def test_decompress_size(self):
good_decomp_len = len(self.decomp)
n = 100
objects_spec = [(Blob.type_num, b'blob')]
for i in range(n):
- objects_spec.append((OFS_DELTA, (i, b'blob' + str(i).encode('ascii'))))
+ objects_spec.append(
+ (OFS_DELTA, (i, b'blob' + str(i).encode('ascii'))))
f = BytesIO()
entries = build_pack(f, objects_spec)
self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f))
n = 100
objects_spec = [(Blob.type_num, b'blob')]
for i in range(n):
- objects_spec.append((OFS_DELTA, (0, b'blob' + str(i).encode('ascii'))))
+ objects_spec.append(
+ (OFS_DELTA, (0, b'blob' + str(i).encode('ascii'))))
f = BytesIO()
entries = build_pack(f, objects_spec)
self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f))
def test_bad_ext_ref_non_thin_pack(self):
blob, = self.store_blobs([b'blob'])
f = BytesIO()
- entries = build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))],
- store=self.store)
+ build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))], store=self.store)
pack_iter = self.make_pack_iter(f, thin=False)
try:
list(pack_iter._walk_all_chains())
def test_basic(self):
self.assertEqual(b'\x80', _encode_copy_operation(0, 0))
self.assertEqual(b'\x91\x01\x0a', _encode_copy_operation(1, 10))
- self.assertEqual(b'\xb1\x64\xe8\x03', _encode_copy_operation(100, 1000))
- self.assertEqual(b'\x93\xe8\x03\x01', _encode_copy_operation(1000, 1))
+ self.assertEqual(b'\xb1\x64\xe8\x03',
+ _encode_copy_operation(100, 1000))
+ self.assertEqual(b'\x93\xe8\x03\x01',
+ _encode_copy_operation(1000, 1))
write_commit_patch(f, c, b"CONTENTS", (1, 1), version="custom")
f.seek(0)
lines = f.readlines()
- self.assertTrue(lines[0].startswith(b"From 0b0d34d1b5b596c928adc9a727a4b9e03d025298"))
+ self.assertTrue(lines[0].startswith(
+ b"From 0b0d34d1b5b596c928adc9a727a4b9e03d025298"))
self.assertEqual(lines[1], b"From: Jelmer <jelmer@samba.org>\n")
self.assertTrue(lines[2].startswith(b"Date: "))
self.assertEqual([
class ReadGitAmPatch(TestCase):
def test_extract_string(self):
- text = b"""From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001
+ text = b"""\
+From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001
From: Jelmer Vernooij <jelmer@samba.org>
Date: Thu, 15 Apr 2010 15:40:28 +0200
-Subject: [PATCH 1/2] Remove executable bit from prey.ico (triggers a lintian warning).
+Subject: [PATCH 1/2] Remove executable bit from prey.ico (triggers a warning).
---
pixmaps/prey.ico | Bin 9662 -> 9662 bytes
--
1.7.0.4
-"""
- c, diff, version = git_am_patch_split(StringIO(text.decode("utf-8")), "utf-8")
+""" # noqa: W291
+ c, diff, version = git_am_patch_split(
+ StringIO(text.decode("utf-8")), "utf-8")
self.assertEqual(b"Jelmer Vernooij <jelmer@samba.org>", c.committer)
self.assertEqual(b"Jelmer Vernooij <jelmer@samba.org>", c.author)
self.assertEqual(b"Remove executable bit from prey.ico "
- b"(triggers a lintian warning).\n", c.message)
+ b"(triggers a warning).\n", c.message)
self.assertEqual(b""" pixmaps/prey.ico | Bin 9662 -> 9662 bytes
1 files changed, 0 insertions(+), 0 deletions(-)
mode change 100755 => 100644 pixmaps/prey.ico
self.assertEqual(b"1.7.0.4", version)
def test_extract_bytes(self):
- text = b"""From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001
+ text = b"""\
+From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001
From: Jelmer Vernooij <jelmer@samba.org>
Date: Thu, 15 Apr 2010 15:40:28 +0200
-Subject: [PATCH 1/2] Remove executable bit from prey.ico (triggers a lintian warning).
+Subject: [PATCH 1/2] Remove executable bit from prey.ico (triggers a warning).
---
pixmaps/prey.ico | Bin 9662 -> 9662 bytes
--
1.7.0.4
-"""
+""" # noqa: W291
c, diff, version = git_am_patch_split(BytesIO(text))
self.assertEqual(b"Jelmer Vernooij <jelmer@samba.org>", c.committer)
self.assertEqual(b"Jelmer Vernooij <jelmer@samba.org>", c.author)
self.assertEqual(b"Remove executable bit from prey.ico "
- b"(triggers a lintian warning).\n", c.message)
+ b"(triggers a warning).\n", c.message)
self.assertEqual(b""" pixmaps/prey.ico | Bin 9662 -> 9662 bytes
1 files changed, 0 insertions(+), 0 deletions(-)
mode change 100755 => 100644 pixmaps/prey.ico
--
1.7.0.4
-"""
+""" # noqa: W291
c, diff, version = git_am_patch_split(BytesIO(text), "utf-8")
- self.assertEqual(b'Added unit tests for dulwich.object_store.tree_lookup_path.\n\n* dulwich/tests/test_object_store.py\n (TreeLookupPathTests): This test case contains a few tests that ensure the\n tree_lookup_path function works as expected.\n', c.message)
+ self.assertEqual(b'''\
+Added unit tests for dulwich.object_store.tree_lookup_path.
+
+* dulwich/tests/test_object_store.py
+ (TreeLookupPathTests): This test case contains a few tests that ensure the
+ tree_lookup_path function works as expected.
+''', c.message)
def test_extract_pseudo_from_header(self):
text = b"""From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001
--
1.7.0.4
-"""
+""" # noqa: W291
c, diff, version = git_am_patch_split(BytesIO(text), "utf-8")
self.assertEqual(b"Jelmer Vernooy <jelmer@debian.org>", c.author)
- self.assertEqual(b'Added unit tests for dulwich.object_store.tree_lookup_path.\n\n* dulwich/tests/test_object_store.py\n (TreeLookupPathTests): This test case contains a few tests that ensure the\n tree_lookup_path function works as expected.\n', c.message)
+ self.assertEqual(b'''\
+Added unit tests for dulwich.object_store.tree_lookup_path.
+
+* dulwich/tests/test_object_store.py
+ (TreeLookupPathTests): This test case contains a few tests that ensure the
+ tree_lookup_path function works as expected.
+''', c.message)
def test_extract_no_version_tail(self):
- text = b"""From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001
+ text = b"""\
+From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001
From: Jelmer Vernooij <jelmer@samba.org>
Date: Thu, 15 Apr 2010 15:40:28 +0200
Subject: [Dulwich-users] [PATCH] Added unit tests for
self.assertEqual(None, version)
def test_extract_mercurial(self):
- raise SkipTest("git_am_patch_split doesn't handle Mercurial patches properly yet")
- expected_diff = """diff --git a/dulwich/tests/test_patch.py b/dulwich/tests/test_patch.py
+ raise SkipTest(
+ "git_am_patch_split doesn't handle Mercurial patches "
+ "properly yet")
+ expected_diff = """\
+diff --git a/dulwich/tests/test_patch.py b/dulwich/tests/test_patch.py
--- a/dulwich/tests/test_patch.py
+++ b/dulwich/tests/test_patch.py
@@ -158,7 +158,7 @@
class DiffTests(TestCase):
-"""
- text = """From dulwich-users-bounces+jelmer=samba.org@lists.launchpad.net Mon Nov 29 00:58:18 2010
+""" # noqa: W291,W293
+ text = """\
+From dulwich-users-bounces+jelmer=samba.org@lists.launchpad.net \
+Mon Nov 29 00:58:18 2010
Date: Sun, 28 Nov 2010 17:57:27 -0600
From: Augie Fackler <durin42@gmail.com>
To: dulwich-users <dulwich-users@lists.launchpad.net>
Unsubscribe : https://launchpad.net/~dulwich-users
More help : https://help.launchpad.net/ListHelp
-""" % expected_diff
+""" % expected_diff # noqa: W291
c, diff, version = git_am_patch_split(BytesIO(text))
self.assertEqual(expected_diff, diff)
self.assertEqual(None, version)
f = BytesIO()
# Prepare two slightly different PNG headers
b1 = Blob.from_string(
- b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52"
- b"\x00\x00\x01\xd5\x00\x00\x00\x9f\x08\x04\x00\x00\x00\x05\x04\x8b")
+ b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a"
+ b"\x00\x00\x00\x0d\x49\x48\x44\x52"
+ b"\x00\x00\x01\xd5\x00\x00\x00\x9f"
+ b"\x08\x04\x00\x00\x00\x05\x04\x8b")
b2 = Blob.from_string(
- b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52"
- b"\x00\x00\x01\xd5\x00\x00\x00\x9f\x08\x03\x00\x00\x00\x98\xd3\xb3")
+ b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a"
+ b"\x00\x00\x00\x0d\x49\x48\x44\x52"
+ b"\x00\x00\x01\xd5\x00\x00\x00\x9f"
+ b"\x08\x03\x00\x00\x00\x98\xd3\xb3")
store = MemoryObjectStore()
store.add_objects([(b1, None), (b2, None)])
write_object_diff(
b' \x89PNG',
b' \x1a',
b' \x00\x00\x00',
- b'-IHDR\x00\x00\x01\xd5\x00\x00\x00\x9f\x08\x04\x00\x00\x00\x05\x04\x8b',
+ b'-IHDR\x00\x00\x01\xd5\x00\x00\x00'
+ b'\x9f\x08\x04\x00\x00\x00\x05\x04\x8b',
b'\\ No newline at end of file',
- b'+IHDR\x00\x00\x01\xd5\x00\x00\x00\x9f\x08\x03\x00\x00\x00\x98\xd3\xb3',
+ b'+IHDR\x00\x00\x01\xd5\x00\x00\x00\x9f'
+ b'\x08\x03\x00\x00\x00\x98\xd3\xb3',
b'\\ No newline at end of file'
], f.getvalue().splitlines())
f = BytesIO()
# Prepare two slightly different PNG headers
b1 = Blob.from_string(
- b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52"
- b"\x00\x00\x01\xd5\x00\x00\x00\x9f\x08\x04\x00\x00\x00\x05\x04\x8b")
+ b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a"
+ b"\x00\x00\x00\x0d\x49\x48\x44\x52"
+ b"\x00\x00\x01\xd5\x00\x00\x00\x9f"
+ b"\x08\x04\x00\x00\x00\x05\x04\x8b")
b2 = Blob.from_string(
- b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52"
- b"\x00\x00\x01\xd5\x00\x00\x00\x9f\x08\x03\x00\x00\x00\x98\xd3\xb3")
+ b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a"
+ b"\x00\x00\x00\x0d\x49\x48\x44\x52"
+ b"\x00\x00\x01\xd5\x00\x00\x00\x9f"
+ b"\x08\x03\x00\x00\x00\x98\xd3\xb3")
store = MemoryObjectStore()
store.add_objects([(b1, None), (b2, None)])
write_object_diff(f, store, (b'foo.png', 0o644, b1.id),
def test_object_diff_add_bin_blob(self):
f = BytesIO()
b2 = Blob.from_string(
- b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52'
- b'\x00\x00\x01\xd5\x00\x00\x00\x9f\x08\x03\x00\x00\x00\x98\xd3\xb3')
+ b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a'
+ b'\x00\x00\x00\x0d\x49\x48\x44\x52'
+ b'\x00\x00\x01\xd5\x00\x00\x00\x9f'
+ b'\x08\x03\x00\x00\x00\x98\xd3\xb3')
store = MemoryObjectStore()
store.add_object(b2)
write_object_diff(f, store, (None, None, None),
def test_object_diff_remove_bin_blob(self):
f = BytesIO()
b1 = Blob.from_string(
- b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a\x00\x00\x00\x0d\x49\x48\x44\x52'
- b'\x00\x00\x01\xd5\x00\x00\x00\x9f\x08\x04\x00\x00\x00\x05\x04\x8b')
+ b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a'
+ b'\x00\x00\x00\x0d\x49\x48\x44\x52'
+ b'\x00\x00\x01\xd5\x00\x00\x00\x9f'
+ b'\x08\x04\x00\x00\x00\x05\x04\x8b')
store = MemoryObjectStore()
store.add_object(b1)
write_object_diff(f, store, (b'foo.png', 0o644, b1.id),
"""Tests for the archive command."""
def test_simple(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"refs/heads/master"] = c3.id
out = BytesIO()
err = BytesIO()
porcelain.archive(self.repo.path, b"refs/heads/master", outstream=out,
- errstream=err)
+ errstream=err)
self.assertEqual(b"", err.getvalue())
tf = tarfile.TarFile(fileobj=out)
self.addCleanup(tf.close)
class UpdateServerInfoTests(PorcelainTestCase):
def test_simple(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"refs/heads/foo"] = c3.id
porcelain.update_server_info(self.repo.path)
- self.assertTrue(os.path.exists(os.path.join(self.repo.controldir(),
- 'info', 'refs')))
+ self.assertTrue(os.path.exists(
+ os.path.join(self.repo.controldir(), 'info', 'refs')))
class CommitTests(PorcelainTestCase):
def test_custom_author(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"refs/heads/foo"] = c3.id
- sha = porcelain.commit(self.repo.path, message=b"Some message",
- author=b"Joe <joe@example.com>", committer=b"Bob <bob@example.com>")
+ sha = porcelain.commit(
+ self.repo.path, message=b"Some message",
+ author=b"Joe <joe@example.com>",
+ committer=b"Bob <bob@example.com>")
self.assertTrue(isinstance(sha, bytes))
self.assertEqual(len(sha), 40)
f.write("\n")
porcelain.add(repo=self.repo.path, paths=['blah'])
porcelain.commit(repo=self.repo.path, message=b'test',
- author=b'test', committer=b'test')
+ author=b'test', committer=b'test')
# Add a second test file and a file in a directory
with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
os.chdir(os.path.join(self.repo.path, 'foo'))
porcelain.add(repo=self.repo.path)
porcelain.commit(repo=self.repo.path, message=b'test',
- author=b'test', committer=b'test')
+ author=b'test', committer=b'test')
finally:
os.chdir(cwd)
class LogTests(PorcelainTestCase):
def test_simple(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"HEAD"] = c3.id
outstream = StringIO()
porcelain.log(self.repo.path, outstream=outstream)
self.assertEqual(3, outstream.getvalue().count("-" * 50))
def test_max_entries(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"HEAD"] = c3.id
outstream = StringIO()
porcelain.log(self.repo.path, outstream=outstream, max_entries=1)
class ShowTests(PorcelainTestCase):
def test_nolist(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"HEAD"] = c3.id
outstream = StringIO()
porcelain.show(self.repo.path, objects=c3.id, outstream=outstream)
self.assertTrue(outstream.getvalue().startswith("-" * 50))
def test_simple(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"HEAD"] = c3.id
outstream = StringIO()
porcelain.show(self.repo.path, objects=[c3.id], outstream=outstream)
class SymbolicRefTests(PorcelainTestCase):
def test_set_wrong_symbolic_ref(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"HEAD"] = c3.id
- self.assertRaises(ValueError, porcelain.symbolic_ref, self.repo.path, b'foobar')
+ self.assertRaises(ValueError, porcelain.symbolic_ref, self.repo.path,
+ b'foobar')
def test_set_force_wrong_symbolic_ref(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"HEAD"] = c3.id
porcelain.symbolic_ref(self.repo.path, b'force_foobar', force=True)
- #test if we actually changed the file
+ # test if we actually changed the file
with self.repo.get_named_file('HEAD') as f:
new_ref = f.read()
self.assertEqual(new_ref, b'ref: refs/heads/force_foobar\n')
def test_set_symbolic_ref(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"HEAD"] = c3.id
porcelain.symbolic_ref(self.repo.path, b'master')
def test_set_symbolic_ref_other_than_master(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]], attrs=dict(refs='develop'))
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]],
+ attrs=dict(refs='develop'))
self.repo.refs[b"HEAD"] = c3.id
self.repo.refs[b"refs/heads/develop"] = c3.id
porcelain.symbolic_ref(self.repo.path, b'develop')
- #test if we actually changed the file
+ # test if we actually changed the file
with self.repo.get_named_file('HEAD') as f:
new_ref = f.read()
self.assertEqual(new_ref, b'ref: refs/heads/develop\n')
class DiffTreeTests(PorcelainTestCase):
def test_empty(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"HEAD"] = c3.id
outstream = BytesIO()
- porcelain.diff_tree(self.repo.path, c2.tree, c3.tree, outstream=outstream)
+ porcelain.diff_tree(self.repo.path, c2.tree, c3.tree,
+ outstream=outstream)
self.assertEqual(outstream.getvalue(), b"")
class CommitTreeTests(PorcelainTestCase):
def test_simple(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
b = Blob()
b.data = b"foo the bar"
t = Tree()
class RevListTests(PorcelainTestCase):
def test_simple(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
outstream = BytesIO()
porcelain.rev_list(
self.repo.path, [c3.id], outstream=outstream)
class TagCreateTests(PorcelainTestCase):
def test_annotated(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"HEAD"] = c3.id
porcelain.tag_create(self.repo.path, b"tryme", b'foo <foo@bar.com>',
- b'bar', annotated=True)
+ b'bar', annotated=True)
tags = self.repo.refs.as_dict(b"refs/tags")
self.assertEqual(list(tags.keys()), [b"tryme"])
self.assertLess(time.time() - tag.tag_time, 5)
def test_unannotated(self):
- c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1],
- [3, 1, 2]])
+ c1, c2, c3 = build_commit_graph(
+ self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"HEAD"] = c3.id
porcelain.tag_create(self.repo.path, b"tryme", annotated=False)
f.write("BAR")
porcelain.add(self.repo.path, paths=["foo"])
porcelain.commit(self.repo.path, message=b"Some message",
- committer=b"Jane <jane@example.com>",
- author=b"John <john@example.com>")
+ committer=b"Jane <jane@example.com>",
+ author=b"John <john@example.com>")
with open(os.path.join(self.repo.path, 'foo'), 'wb') as f:
f.write(b"OOH")
f.write("BAR")
porcelain.add(self.repo.path, paths=["foo"])
sha = porcelain.commit(self.repo.path, message=b"Some message",
- committer=b"Jane <jane@example.com>",
- author=b"John <john@example.com>")
+ committer=b"Jane <jane@example.com>",
+ author=b"John <john@example.com>")
with open(os.path.join(self.repo.path, 'foo'), 'wb') as f:
f.write(b"BAZ")
porcelain.add(self.repo.path, paths=["foo"])
porcelain.commit(self.repo.path, message=b"Some other message",
- committer=b"Jane <jane@example.com>",
- author=b"John <john@example.com>")
+ committer=b"Jane <jane@example.com>",
+ author=b"John <john@example.com>")
porcelain.reset(self.repo, "hard", sha)
errstream = BytesIO()
porcelain.commit(repo=self.repo.path, message=b'init',
- author=b'', committer=b'')
+ author=b'', committer=b'')
# Setup target repo cloned from temp test repo
clone_path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, clone_path)
target_repo = porcelain.clone(self.repo.path, target=clone_path,
- errstream=errstream)
+ errstream=errstream)
try:
self.assertEqual(target_repo[b'HEAD'], self.repo[b'HEAD'])
finally:
os.close(handle)
porcelain.add(repo=clone_path, paths=[os.path.basename(fullpath)])
porcelain.commit(repo=clone_path, message=b'push',
- author=b'', committer=b'')
+ author=b'', committer=b'')
# Setup a non-checked out branch in the remote
refs_path = b"refs/heads/foo"
self.repo.refs[refs_path] = new_id
# Push to the remote
- porcelain.push(clone_path, self.repo.path, b"HEAD:" + refs_path, outstream=outstream,
- errstream=errstream)
+ porcelain.push(clone_path, self.repo.path, b"HEAD:" + refs_path,
+ outstream=outstream, errstream=errstream)
# Check that the target and source
with Repo(clone_path) as r_clone:
change = list(tree_changes(self.repo, self.repo[b'HEAD'].tree,
self.repo[b'refs/heads/foo'].tree))[0]
self.assertEqual(os.path.basename(fullpath),
- change.new.path.decode('ascii'))
+ change.new.path.decode('ascii'))
def test_delete(self):
"""Basic test of porcelain push, removing a branch.
errstream = BytesIO()
porcelain.commit(repo=self.repo.path, message=b'init',
- author=b'', committer=b'')
+ author=b'', committer=b'')
# Setup target repo cloned from temp test repo
clone_path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, clone_path)
target_repo = porcelain.clone(self.repo.path, target=clone_path,
- errstream=errstream)
+ errstream=errstream)
target_repo.close()
# Setup a non-checked out branch in the remote
self.repo.refs[refs_path] = new_id
# Push to the remote
- porcelain.push(clone_path, self.repo.path, b":" + refs_path, outstream=outstream,
- errstream=errstream)
+ porcelain.push(clone_path, self.repo.path, b":" + refs_path,
+ outstream=outstream, errstream=errstream)
self.assertEqual({
b'HEAD': new_id,
}, self.repo.get_refs())
-
class PullTests(PorcelainTestCase):
def setUp(self):
self.target_path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.target_path)
target_repo = porcelain.clone(self.repo.path, target=self.target_path,
- errstream=BytesIO())
+ errstream=BytesIO())
target_repo.close()
# create a second file to be pushed
filename = os.path.basename(fullpath)
porcelain.add(repo=self.repo.path, paths=filename)
porcelain.commit(repo=self.repo.path, message=b'test2',
- author=b'test2', committer=b'test2')
+ author=b'test2', committer=b'test2')
self.assertTrue(b'refs/heads/master' in self.repo.refs)
self.assertTrue(b'refs/heads/master' in target_repo.refs)
# Pull changes into the cloned repo
porcelain.pull(self.target_path, self.repo.path, b'refs/heads/master',
- outstream=outstream, errstream=errstream)
+ outstream=outstream, errstream=errstream)
# Check the target repo for pushed changes
with Repo(self.target_path) as r:
porcelain.add(repo=self.repo.path, paths=['foo'])
porcelain.commit(repo=self.repo.path, message=b'test status',
- author=b'', committer=b'')
+ author=b'', committer=b'')
# modify access and modify time of path
os.utime(fullpath, (0, 0))
results = porcelain.status(self.repo)
- self.assertEqual(results.staged['add'][0], filename_add.encode('ascii'))
+ self.assertEqual(results.staged['add'][0],
+ filename_add.encode('ascii'))
self.assertEqual(results.unstaged, [b'foo'])
def test_get_tree_changes_add(self):
f.write('stuff')
porcelain.add(repo=self.repo.path, paths=filename)
porcelain.commit(repo=self.repo.path, message=b'test status',
- author=b'', committer=b'')
+ author=b'', committer=b'')
filename = 'foo'
with open(os.path.join(self.repo.path, filename), 'w') as f:
f.write('stuff')
porcelain.add(repo=self.repo.path, paths=filename)
porcelain.commit(repo=self.repo.path, message=b'test status',
- author=b'', committer=b'')
+ author=b'', committer=b'')
with open(fullpath, 'w') as f:
f.write('otherstuff')
porcelain.add(repo=self.repo.path, paths=filename)
f.write('stuff')
porcelain.add(repo=self.repo.path, paths=filename)
porcelain.commit(repo=self.repo.path, message=b'test status',
- author=b'', committer=b'')
+ author=b'', committer=b'')
porcelain.rm(repo=self.repo.path, paths=[filename])
changes = porcelain.get_tree_changes(self.repo.path)
def test_upload_pack(self):
outf = BytesIO()
- exitcode = porcelain.upload_pack(self.repo.path, BytesIO(b"0000"), outf)
+ exitcode = porcelain.upload_pack(
+ self.repo.path, BytesIO(b"0000"), outf)
outlines = outf.getvalue().splitlines()
self.assertEqual([b"0000"], outlines)
self.assertEqual(0, exitcode)
f.write('stuff')
porcelain.add(repo=self.repo.path, paths=filename)
self.repo.do_commit(message=b'test status',
- author=b'', committer=b'', author_timestamp=1402354300,
- commit_timestamp=1402354300, author_timezone=0, commit_timezone=0)
+ author=b'', committer=b'',
+ author_timestamp=1402354300,
+ commit_timestamp=1402354300, author_timezone=0,
+ commit_timezone=0)
outf = BytesIO()
- exitcode = porcelain.receive_pack(self.repo.path, BytesIO(b"0000"), outf)
+ exitcode = porcelain.receive_pack(
+ self.repo.path, BytesIO(b"0000"), outf)
outlines = outf.getvalue().splitlines()
self.assertEqual([
- b'00739e65bdcf4a22cdd4f3700604a275cd2aaf146b23 HEAD\x00 report-status '
+ b'00739e65bdcf4a22cdd4f3700604a275cd2aaf146b23 HEAD\x00 report-status ' # noqa: E501
b'delete-refs quiet ofs-delta side-band-64k no-done',
b'003f9e65bdcf4a22cdd4f3700604a275cd2aaf146b23 refs/heads/master',
b'0000'], outlines)
target_path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, target_path)
target_repo = porcelain.clone(self.repo.path, target=target_path,
- errstream=errstream)
+ errstream=errstream)
# create a second file to be pushed
handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
filename = os.path.basename(fullpath)
porcelain.add(repo=self.repo.path, paths=filename)
porcelain.commit(repo=self.repo.path, message=b'test2',
- author=b'test2', committer=b'test2')
+ author=b'test2', committer=b'test2')
self.assertFalse(self.repo[b'HEAD'].id in target_repo)
target_repo.close()
# Fetch changes into the cloned repo
porcelain.fetch(target_path, self.repo.path, outstream=outstream,
- errstream=errstream)
+ errstream=errstream)
# Check the target repo for pushed changes
with Repo(target_path) as r:
def test_empty(self):
porcelain.commit(repo=self.repo.path, message=b'test status',
- author=b'', committer=b'')
+ author=b'', committer=b'')
f = StringIO()
porcelain.ls_tree(self.repo, b"HEAD", outstream=f)
porcelain.add(repo=self.repo.path, paths=['foo'])
porcelain.commit(repo=self.repo.path, message=b'test status',
- author=b'', committer=b'')
+ author=b'', committer=b'')
f = StringIO()
porcelain.ls_tree(self.repo, b"HEAD", outstream=f)
def test_some(self):
cid = porcelain.commit(repo=self.repo.path, message=b'test status',
- author=b'', committer=b'')
+ author=b'', committer=b'')
self.assertEqual({
b'refs/heads/master': cid,
porcelain.remote_add(
self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich')
self.assertRaises(porcelain.RemoteExists, porcelain.remote_add,
- self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich')
+ self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich')
def recv(self, size):
# fail fast if no bytes are available; in a real socket, this would
# block forever
- if self.tell() == len(self.getvalue()) and not self.allow_read_past_eof:
+ if (self.tell() == len(self.getvalue())
+ and not self.allow_read_past_eof):
raise GitProtocolError('Blocking read past end of socket')
if size == 1:
return self.read(1)
def test_caps(self):
self.assertEqual((b'bla', [b'la']), extract_capabilities(b'bla\0la'))
self.assertEqual((b'bla', [b'la']), extract_capabilities(b'bla\0la\n'))
- self.assertEqual((b'bla', [b'la', b'la']), extract_capabilities(b'bla\0la la'))
+ self.assertEqual((b'bla', [b'la', b'la']),
+ extract_capabilities(b'bla\0la la'))
def test_plain_want_line(self):
- self.assertEqual((b'want bla', []), extract_want_line_capabilities(b'want bla'))
+ self.assertEqual((b'want bla', []),
+ extract_want_line_capabilities(b'want bla'))
def test_caps_want_line(self):
self.assertEqual((b'want bla', [b'la']),
- extract_want_line_capabilities(b'want bla la'))
+ extract_want_line_capabilities(b'want bla la'))
self.assertEqual((b'want bla', [b'la']),
- extract_want_line_capabilities(b'want bla la\n'))
+ extract_want_line_capabilities(b'want bla la\n'))
self.assertEqual((b'want bla', [b'la', b'la']),
- extract_want_line_capabilities(b'want bla la la'))
+ extract_want_line_capabilities(b'want bla la la'))
def test_ack_type(self):
self.assertEqual(SINGLE_ACK, ack_type([b'foo', b'bar']))
self.assertEqual(MULTI_ACK, ack_type([b'foo', b'bar', b'multi_ack']))
self.assertEqual(MULTI_ACK_DETAILED,
- ack_type([b'foo', b'bar', b'multi_ack_detailed']))
+ ack_type([b'foo', b'bar', b'multi_ack_detailed']))
# choose detailed when both present
self.assertEqual(MULTI_ACK_DETAILED,
- ack_type([b'foo', b'bar', b'multi_ack',
- b'multi_ack_detailed']))
+ ack_type([b'foo', b'bar', b'multi_ack',
+ b'multi_ack_detailed']))
class BufferedPktLineWriterTests(TestCase):
1446552482, 0, b'clone: from git://jelmer.uk/samba'))
def test_parse(self):
+ reflog_line = (
+ b'0000000000000000000000000000000000000000 '
+ b'49030649db3dfec5a9bc03e5dde4255a14499f16 Jelmer Vernooij '
+ b'<jelmer@jelmer.uk> 1446552482 +0000 '
+ b'clone: from git://jelmer.uk/samba'
+ )
self.assertEqual(
(b'0000000000000000000000000000000000000000',
b'49030649db3dfec5a9bc03e5dde4255a14499f16',
b'Jelmer Vernooij <jelmer@jelmer.uk>',
1446552482, 0, b'clone: from git://jelmer.uk/samba'),
- parse_reflog_line(
- b'0000000000000000000000000000000000000000 '
- b'49030649db3dfec5a9bc03e5dde4255a14499f16 Jelmer Vernooij '
- b'<jelmer@jelmer.uk> 1446552482 +0000 '
- b'clone: from git://jelmer.uk/samba'))
+ parse_reflog_line(reflog_line))
THREES = b'3' * 40
FOURS = b'4' * 40
+
class PackedRefsFileTests(TestCase):
def test_split_ref_line_errors(self):
f = BytesIO(b'\n'.join([
ONES + b' ref/1',
b'^' + TWOS]))
- self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
+ self.assertRaises(errors.PackedRefsException, list,
+ read_packed_refs(f))
def test_read_with_peeled(self):
f = BytesIO(b'\n'.join([
f = BytesIO(b'\n'.join([
b'^' + TWOS,
ONES + b' ref/1']))
- self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
+ self.assertRaises(errors.PackedRefsException, list,
+ read_packed_refs(f))
f = BytesIO(b'\n'.join([
- ONES + b' ref/1',
- b'^' + TWOS,
- b'^' + THREES]))
- self.assertRaises(errors.PackedRefsException, list, read_packed_refs(f))
+ ONES + b' ref/1',
+ b'^' + TWOS,
+ b'^' + THREES]))
+ self.assertRaises(errors.PackedRefsException, list,
+ read_packed_refs(f))
def test_write_with_peeled(self):
f = BytesIO()
# Dict of refs that we expect all RefsContainerTests subclasses to define.
_TEST_REFS = {
b'HEAD': b'42d06bd4b77fed026b154d16493e5deab78f02ec',
- b'refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa': b'42d06bd4b77fed026b154d16493e5deab78f02ec',
+ b'refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa':
+ b'42d06bd4b77fed026b154d16493e5deab78f02ec',
b'refs/heads/master': b'42d06bd4b77fed026b154d16493e5deab78f02ec',
b'refs/heads/packed': b'42d06bd4b77fed026b154d16493e5deab78f02ec',
b'refs/tags/refs-0.1': b'df6800012397fb85c56e7418dd4eb9405dee075c',
self.assertEqual(_TEST_REFS, self._refs.as_dict())
def test_setitem(self):
- self._refs[b'refs/some/ref'] = b'42d06bd4b77fed026b154d16493e5deab78f02ec'
+ self._refs[b'refs/some/ref'] = (
+ b'42d06bd4b77fed026b154d16493e5deab78f02ec')
self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec',
self._refs[b'refs/some/ref'])
self.assertRaises(
self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec',
self._refs[b'HEAD'])
self.assertTrue(self._refs.remove_if_equals(
- b'refs/tags/refs-0.2', b'3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8'))
+ b'refs/tags/refs-0.2',
+ b'3ec9c43c84ff242e3ef4a9fc5bc111fd780a76a8'))
self.assertTrue(self._refs.remove_if_equals(
b'refs/tags/refs-0.2', ZERO_SHA))
self.assertFalse(b'refs/tags/refs-0.2' in self._refs)
self.assertEqual(b'ref: refs/heads/master', v)
# ensure the symbolic link was written through
- f = open(os.path.join(self._refs.path, 'refs', 'heads', 'master'), 'rb')
+ f = open(os.path.join(self._refs.path, 'refs', 'heads', 'master'),
+ 'rb')
self.assertEqual(ones, f.read()[:40])
f.close()
self.assertRaises(KeyError, lambda: self._refs[b'refs/tags/refs-0.1'])
def test_read_ref(self):
- self.assertEqual(b'ref: refs/heads/master', self._refs.read_ref(b'HEAD'))
+ self.assertEqual(b'ref: refs/heads/master',
+ self._refs.read_ref(b'HEAD'))
self.assertEqual(b'42d06bd4b77fed026b154d16493e5deab78f02ec',
self._refs.read_ref(b'refs/heads/packed'))
self.assertEqual(None, self._refs.read_ref(b'nonexistant'))
def test_non_ascii(self):
try:
- encoded_ref = u'refs/tags/schön'.encode(sys.getfilesystemencoding())
+ encoded_ref = u'refs/tags/schön'.encode(
+ sys.getfilesystemencoding())
except UnicodeEncodeError:
- raise SkipTest("filesystem encoding doesn't support special character")
+ raise SkipTest(
+ "filesystem encoding doesn't support special character")
p = os.path.join(self._repo.path, 'refs', 'tags', u'schön')
with open(p, 'w') as f:
f.write('00' * 20)
_TEST_REFS_SERIALIZED = (
- b'42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa\n'
+ b'42d06bd4b77fed026b154d16493e5deab78f02ec\t'
+ b'refs/heads/40-char-ref-aaaaaaaaaaaaaaaaaa\n'
b'42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/master\n'
b'42d06bd4b77fed026b154d16493e5deab78f02ec\trefs/heads/packed\n'
b'df6800012397fb85c56e7418dd4eb9405dee075c\trefs/tags/refs-0.1\n'
with open(os.path.join(r.path, 'a'), 'wb') as f:
f.write(b'file contents')
r.stage(['a'])
- commit_sha = r.do_commit(b'msg',
- committer=b'Test Committer <test@nodomain.com>',
- author=b'Test Author <test@nodomain.com>',
- commit_timestamp=12345, commit_timezone=0,
- author_timestamp=12345, author_timezone=0)
+ commit_sha = r.do_commit(
+ b'msg',
+ committer=b'Test Committer <test@nodomain.com>',
+ author=b'Test Author <test@nodomain.com>',
+ commit_timestamp=12345, commit_timezone=0,
+ author_timestamp=12345, author_timezone=0)
self.assertEqual([], r[commit_sha].parents)
self._root_commit = commit_sha
self.assertSucceeds(set_caps, [b'cap2', b'ignoreme'])
def test_has_capability(self):
- self.assertRaises(GitProtocolError, self._handler.has_capability, b'cap')
+ self.assertRaises(GitProtocolError, self._handler.has_capability,
+ b'cap')
caps = self._handler.capabilities()
self._handler.set_client_capabilities(caps)
for cap in caps:
caps = list(self._handler.required_capabilities()) + [b'include-tag']
self._handler.set_client_capabilities(caps)
self.assertEqual({b'1234' * 10: ONE, b'5678' * 10: TWO},
- self._handler.get_tagged(refs, repo=self._repo))
+ self._handler.get_tagged(refs, repo=self._repo))
# non-include-tag case
caps = self._handler.required_capabilities()
def required_capabilities(self):
return ()
+
class ReceivePackHandlerTestCase(TestCase):
def setUp(self):
self._repo = MemoryRepo.init_bare([], {})
backend = DictBackend({b'/': self._repo})
self._walker = ProtocolGraphWalker(
- TestUploadPackHandler(backend, [b'/', b'host=lolcats'], TestProto()),
- self._repo.object_store, self._repo.get_peeled)
+ TestUploadPackHandler(backend, [b'/', b'host=lolcats'],
+ TestProto()),
+ self._repo.object_store, self._repo.get_peeled)
def test_empty_repository(self):
# The server should wait for a flush packet.
self.assertEqual(None, self._walker.proto.get_received_line())
-
class ProtocolGraphWalkerTestCase(TestCase):
def setUp(self):
self._repo = MemoryRepo.init_bare(commits, {})
backend = DictBackend({b'/': self._repo})
self._walker = ProtocolGraphWalker(
- TestUploadPackHandler(backend, [b'/', b'host=lolcats'], TestProto()),
- self._repo.object_store, self._repo.get_peeled)
+ TestUploadPackHandler(backend, [b'/', b'host=lolcats'],
+ TestProto()),
+ self._repo.object_store, self._repo.get_peeled)
def test_all_wants_satisfied_no_haves(self):
self._walker.set_wants([ONE])
def test_split_proto_line(self):
allowed = (b'want', b'done', None)
self.assertEqual((b'want', ONE),
- _split_proto_line(b'want ' + ONE + b'\n', allowed))
+ _split_proto_line(b'want ' + ONE + b'\n', allowed))
self.assertEqual((b'want', TWO),
- _split_proto_line(b'want ' + TWO + b'\n', allowed))
+ _split_proto_line(b'want ' + TWO + b'\n', allowed))
self.assertRaises(GitProtocolError, _split_proto_line,
b'want xxxx\n', allowed)
self.assertRaises(UnexpectedCommandError, _split_proto_line,
self.assertRaises(GitProtocolError, _split_proto_line,
b'foo ' + FOUR + b'\n', allowed)
self.assertRaises(GitProtocolError, _split_proto_line, b'bar', allowed)
- self.assertEqual((b'done', None), _split_proto_line(b'done\n', allowed))
+ self.assertEqual((b'done', None),
+ _split_proto_line(b'done\n', allowed))
self.assertEqual((None, None), _split_proto_line(b'', allowed))
def test_determine_wants(self):
self._walker.advertise_refs = False
self._walker.proto.set_output([b'want ' + FOUR + b' multi_ack', None])
- self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
+ self.assertRaises(GitProtocolError, self._walker.determine_wants,
+ heads)
self._walker.proto.set_output([None])
self.assertEqual([], self._walker.determine_wants(heads))
- self._walker.proto.set_output([b'want ' + ONE + b' multi_ack', b'foo', None])
- self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
+ self._walker.proto.set_output(
+ [b'want ' + ONE + b' multi_ack', b'foo', None])
+ self.assertRaises(GitProtocolError, self._walker.determine_wants,
+ heads)
self._walker.proto.set_output([b'want ' + FOUR + b' multi_ack', None])
- self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
+ self.assertRaises(GitProtocolError, self._walker.determine_wants,
+ heads)
def test_determine_wants_advertisement(self):
self._walker.proto.set_output([None])
return
# Whether or not PACK is sent after is determined by this, so
# record this value.
- self.pack_sent = self._impl.handle_done(self.done_required,
- self.done_received)
+ self.pack_sent = self._impl.handle_done(
+ self.done_required, self.done_received)
return self.pack_sent
def notify_done(self):
self.backend = FileSystemBackend()
def test_nonexistant(self):
- self.assertRaises(NotGitRepository,
- self.backend.open_repository, "/does/not/exist/unless/foo")
+ self.assertRaises(NotGitRepository, self.backend.open_repository,
+ "/does/not/exist/unless/foo")
def test_absolute(self):
repo = self.backend.open_repository(self.path)
os.path.normcase(os.path.abspath(self.repo.path)))
def test_child(self):
- self.assertRaises(NotGitRepository,
- self.backend.open_repository, os.path.join(self.path, "foo"))
+ self.assertRaises(
+ NotGitRepository,
+ self.backend.open_repository, os.path.join(self.path, "foo"))
def test_bad_repo_path(self):
backend = FileSystemBackend()
def test_nonexistant(self):
repo = MemoryRepo.init_bare([], {})
backend = DictBackend({b'/': repo})
- self.assertRaises(NotGitRepository,
- backend.open_repository, "/does/not/exist/unless/foo")
+ self.assertRaises(
+ NotGitRepository, backend.open_repository,
+ "/does/not/exist/unless/foo")
def test_bad_repo_path(self):
repo = MemoryRepo.init_bare([], {})
self.backend = DictBackend({})
def serve_command(self, handler_cls, args, inf, outf):
- return serve_command(handler_cls, [b"test"] + args, backend=self.backend,
- inf=inf, outf=outf)
+ return serve_command(
+ handler_cls, [b"test"] + args, backend=self.backend, inf=inf,
+ outf=outf)
def test_receive_pack(self):
commit = make_commit(id=ONE, parents=[], commit_time=111)
self.backend.repos[b"/"] = MemoryRepo.init_bare(
[commit], {b"refs/heads/master": commit.id})
outf = BytesIO()
- exitcode = self.serve_command(ReceivePackHandler, [b"/"], BytesIO(b"0000"), outf)
+ exitcode = self.serve_command(ReceivePackHandler, [b"/"],
+ BytesIO(b"0000"), outf)
outlines = outf.getvalue().splitlines()
self.assertEqual(2, len(outlines))
- self.assertEqual(b"1111111111111111111111111111111111111111 refs/heads/master",
- outlines[0][4:].split(b"\x00")[0])
+ self.assertEqual(
+ b"1111111111111111111111111111111111111111 refs/heads/master",
+ outlines[0][4:].split(b"\x00")[0])
self.assertEqual(b"0000", outlines[-1])
self.assertEqual(0, exitcode)
update_server_info(self.repo)
with open(os.path.join(self.path, ".git", "info", "refs"), 'rb') as f:
self.assertEqual(b'', f.read())
- with open(os.path.join(self.path, ".git", "objects", "info", "packs"), 'rb') as f:
+ p = os.path.join(self.path, ".git", "objects", "info", "packs")
+ with open(p, 'rb') as f:
self.assertEqual(b'', f.read())
def test_simple(self):
update_server_info(self.repo)
with open(os.path.join(self.path, ".git", "info", "refs"), 'rb') as f:
self.assertEqual(f.read(), commit_id + b'\trefs/heads/foo\n')
- with open(os.path.join(self.path, ".git", "objects", "info", "packs"), 'rb') as f:
+ p = os.path.join(self.path, ".git", "objects", "info", "packs")
+ with open(p, 'rb') as f:
self.assertEqual(f.read(), b'')
)
from dulwich.diff_tree import (
- CHANGE_ADD,
CHANGE_MODIFY,
CHANGE_RENAME,
TreeChange,
2, trees={1: [(b'a', blob_a1)],
2: [(b'a', blob_a2), (b'b', blob_b2)]})
e1 = TestWalkEntry(c1, [TreeChange.add((b'a', F, blob_a1.id))])
- e2 = TestWalkEntry(c2, [TreeChange(CHANGE_MODIFY, (b'a', F, blob_a1.id),
+ e2 = TestWalkEntry(
+ c2,
+ [TreeChange(CHANGE_MODIFY, (b'a', F, blob_a1.id),
(b'a', F, blob_a2.id)),
- TreeChange.add((b'b', F, blob_b2.id))])
+ TreeChange.add((b'b', F, blob_b2.id))])
self.assertWalkYields([e2, e1], [c2.id])
def test_changes_multiple_parents(self):
3: [(b'a', blob_a3), (b'b', blob_b2)]})
# a is a modify/add conflict and b is not conflicted.
changes = [[
- TreeChange(CHANGE_MODIFY, (b'a', F, blob_a1.id), (b'a', F, blob_a3.id)),
- TreeChange.add((b'a', F, blob_a3.id)),
+ TreeChange(CHANGE_MODIFY,
+ (b'a', F, blob_a1.id), (b'a', F, blob_a3.id)),
+ TreeChange.add((b'a', F, blob_a3.id)),
]]
self.assertWalkYields([TestWalkEntry(c3, changes)], [c3.id],
exclude=[c1.id, c2.id])
c1, c2, c3, c4, c5, c6 = self.make_linear_commits(6, trees=trees)
self.assertWalkYields([c5], [c6.id], paths=[b'c'])
- e = lambda n: (n, F, blob.id)
+ def e(n):
+ return (n, F, blob.id)
self.assertWalkYields(
[TestWalkEntry(c5, [TreeChange(CHANGE_RENAME, e(b'b'), e(b'c'))]),
TestWalkEntry(c3, [TreeChange(CHANGE_RENAME, e(b'a'), e(b'b'))]),
5: [(b'a', blob)],
6: [(b'c', blob)]})
- e = lambda n: (n, F, blob.id)
+ def e(n):
+ return (n, F, blob.id)
# Once the path changes to b, we aren't interested in a or c anymore.
self.assertWalkYields(
[TestWalkEntry(c6, [TreeChange(CHANGE_RENAME, e(b'a'), e(b'c'))]),
11, times=[9, 0, 1, 2, 3, 4, 5, 8, 6, 7, 9])
c8, _, c10, c11 = commits[-4:]
del self.store[commits[0].id]
- # c9 is older than we want to walk, but is out of order with its parent,
- # so we need to walk past it to get to c8.
+ # c9 is older than we want to walk, but is out of order with its
+ # parent, so we need to walk past it to get to c8.
# c1 would also match, but we've deleted it, and it should get pruned
# even with over-scanning.
self.assertWalkYields([c11, c10, c8], [c11.id], since=7)
entry_b = (b'y/b', F, blob_b.id)
entry_b2 = (b'y/b', F, blob_b2.id)
self.assertEqual(
- [[TreeChange(CHANGE_MODIFY, entry_a, entry_a2),
- TreeChange.add(entry_a2)],
- [TreeChange.add(entry_b2),
- TreeChange(CHANGE_MODIFY, entry_b, entry_b2)]],
- changes,
+ [[TreeChange(CHANGE_MODIFY, entry_a, entry_a2),
+ TreeChange.add(entry_a2)],
+ [TreeChange.add(entry_b2),
+ TreeChange(CHANGE_MODIFY, entry_b, entry_b2)]],
+ changes,
)
def test_filter_changes(self):
(but defunct). See https://github.com/jonashaag/klaus/issues/154.
"""
zstream, zlength = self._get_zstream(self.example_text)
- self._test_call(self.example_text,
- MinimalistWSGIInputStream2(zstream.read()), zlength)
+ self._test_call(
+ self.example_text,
+ MinimalistWSGIInputStream2(zstream.read()), zlength)
create_delta,
)
from dulwich.repo import Repo
-from dulwich.tests import (
- SkipTest,
+from dulwich.tests import ( # noqa: F401
skipIf,
+ SkipTest,
)
"""Make an object for testing and assign some members.
This method creates a new subclass to allow arbitrary attribute
- reassignment, which is not otherwise possible with objects having __slots__.
+ reassignment, which is not otherwise possible with objects having
+ __slots__.
:param attrs: dict of attributes to set on the new object.
:return: A newly initialized object of type cls.
class TestObject(cls):
"""Class that inherits from the given class, but without __slots__.
- Note that classes with __slots__ can't have arbitrary attributes monkey-
- patched in, so this is a class that is exactly the same only with a
- __dict__ instead of __slots__.
+ Note that classes with __slots__ can't have arbitrary attributes
+ monkey-patched in, so this is a class that is exactly the same only
+ with a __dict__ instead of __slots__.
"""
pass
TestObject.__name__ = 'TestObject_' + cls.__name__
:param object_store: An ObjectStore to commit objects to.
:param commit_spec: An iterable of iterables of ints defining the commit
- graph. Each entry defines one commit, and entries must be in topological
- order. The first element of each entry is a commit number, and the
- remaining elements are its parents. The commit numbers are only
+ graph. Each entry defines one commit, and entries must be in
+ topological order. The first element of each entry is a commit number,
+ and the remaining elements are its parents. The commit numbers are only
meaningful for the call to make_commits; since real commit objects are
created, they will get created with real, opaque SHAs.
:param trees: An optional dict of commit number -> tree spec for building
- trees for commits. The tree spec is an iterable of (path, blob, mode) or
- (path, blob) entries; if mode is omitted, it defaults to the normal file
- mode (0100644).
+ trees for commits. The tree spec is an iterable of (path, blob, mode)
+ or (path, blob) entries; if mode is omitted, it defaults to the normal
+ file mode (0100644).
:param attrs: A dict of commit number -> (dict of attribute -> value) for
assigning additional values to the commits.
:return: The list of commit objects created.
MissingCommitError,
)
from dulwich.objects import (
- Commit,
Tag,
)
use to filter changes. Must be a directory name. Must be
a full, valid, path reference (no partial names or wildcards).
:return: For commits with up to one parent, a list of TreeChange
- objects; if the commit has no parents, these will be relative to the
- empty tree. For merge commits, a list of lists of TreeChange
+ objects; if the commit has no parents, these will be relative to
+ the empty tree. For merge commits, a list of lists of TreeChange
objects; see dulwich.diff.tree_changes_for_merge.
"""
cached = self._changes.get(path_prefix)
parent = self._store[subtree_sha]
else:
changes_func = tree_changes_for_merge
- parent = [self._store[p].tree for p in self._get_parents(commit)]
+ parent = [
+ self._store[p].tree for p in self._get_parents(commit)]
if path_prefix:
parent_trees = [self._store[p] for p in parent]
parent = []
for _, c in self._pq):
_, n = self._pq[0]
if self._last and n.commit_time >= self._last.commit_time:
- # If the next commit is newer than the last one, we need
- # to keep walking in case its parents (which we may not
- # have seen yet) are excluded. This gives the excluded
- # set a chance to "catch up" while the commit is still
- # in the Walker's output queue.
+ # If the next commit is newer than the last one, we
+ # need to keep walking in case its parents (which we
+ # may not have seen yet) are excluded. This gives the
+ # excluded set a chance to "catch up" while the commit
+ # is still in the Walker's output queue.
reset_extra_commits = True
else:
reset_extra_commits = False
if (self._min_time is not None and
- commit.commit_time < self._min_time):
+ commit.commit_time < self._min_time):
# We want to stop walking at min_time, but commits at the
- # boundary may be out of order with respect to their parents. So
- # we walk _MAX_EXTRA_COMMITS more commits once we hit this
+ # boundary may be out of order with respect to their parents.
+ # So we walk _MAX_EXTRA_COMMITS more commits once we hit this
# boundary.
reset_extra_commits = False
ancestors.
:param exclude: Iterable of SHAs of commits to exclude along with their
ancestors, overriding includes.
- :param order: ORDER_* constant specifying the order of results. Anything
- other than ORDER_DATE may result in O(n) memory usage.
+ :param order: ORDER_* constant specifying the order of results.
+ Anything other than ORDER_DATE may result in O(n) memory usage.
:param reverse: If True, reverse the order of output, requiring O(n)
memory.
:param max_entries: The maximum number of entries to yield, or None for
"""Determine if a walk entry should be returned..
:param entry: The WalkEntry to consider.
- :return: True if the WalkEntry should be returned by this walk, or False
- otherwise (e.g. if it doesn't match any requested paths).
+ :return: True if the WalkEntry should be returned by this walk, or
+ False otherwise (e.g. if it doesn't match any requested paths).
"""
commit = entry.commit
if self.since is not None and commit.commit_time < self.since:
:param results: An iterator of WalkEntry objects, in the order returned
from the queue_cls.
- :return: An iterator or list of WalkEntry objects, in the order required
- by the Walker.
+ :return: An iterator or list of WalkEntry objects, in the order
+ required by the Walker.
"""
if self.order == ORDER_TOPO:
results = _topo_reorder(results, self.get_parents)
('GET', re.compile('/objects/info/alternates$')): get_text_file,
('GET', re.compile('/objects/info/http-alternates$')): get_text_file,
('GET', re.compile('/objects/info/packs$')): get_info_packs,
- ('GET', re.compile('/objects/([0-9a-f]{2})/([0-9a-f]{38})$')): get_loose_object,
- ('GET', re.compile('/objects/pack/pack-([0-9a-f]{40})\\.pack$')): get_pack_file,
- ('GET', re.compile('/objects/pack/pack-([0-9a-f]{40})\\.idx$')): get_idx_file,
+ ('GET', re.compile('/objects/([0-9a-f]{2})/([0-9a-f]{38})$')):
+ get_loose_object,
+ ('GET', re.compile('/objects/pack/pack-([0-9a-f]{40})\\.pack$')):
+ get_pack_file,
+ ('GET', re.compile('/objects/pack/pack-([0-9a-f]{40})\\.idx$')):
+ get_idx_file,
('POST', re.compile('/git-upload-pack$')): handle_service_request,
('POST', re.compile('/git-receive-pack$')): handle_service_request,