format changes.
-
Future Enhancements
-------------------
create unique index if not exists path_rev_path_action on changed_path(rev, path, action);
""")
- def find_latest_change(self, path, revnum, include_parents=False,
- include_children=False):
+ def find_latest_change(self, path, revnum, include_children=False):
"""Find latest revision that touched path.
:param path: Path to check for changes
extra += " OR path LIKE '%'"
else:
extra += " OR path LIKE '%s/%%'" % path.strip("/")
- if include_parents:
- extra += " OR ('%s' LIKE (path || '/%%') AND (action = 'R' OR action = 'A'))" % path.strip("/")
+ extra += " OR ('%s' LIKE (path || '/%%') AND (action = 'R' OR action = 'A'))" % path.strip("/")
+
query = "SELECT rev FROM changed_path WHERE (path='%s'%s) AND rev <= %d ORDER BY rev DESC LIMIT 1" % (path.strip("/"), extra, revnum)
row = self.cachedb.execute(query).fetchone()
self.cachedb.commit()
+def struct_revpaths_to_tuples(changed_paths):
+ assert isinstance(changed_paths, dict)
+ revpaths = {}
+ for k,v in changed_paths.items():
+ if v.copyfrom_path is None:
+ copyfrom_path = None
+ else:
+ copyfrom_path = v.copyfrom_path.strip("/")
+ revpaths[k.strip("/")] = (v.action, copyfrom_path, v.copyfrom_rev)
+ return revpaths
+
+
class LogWalker(object):
"""Easy way to access the history of a Subversion repository."""
def __init__(self, transport, limit=None):
self._transport = SvnRaTransport(self.url)
return self._transport
- def find_latest_change(self, path, revnum, include_parents=False,
- include_children=False):
+ def find_latest_change(self, path, revnum, include_children=False):
"""Find latest revision that touched path.
:param path: Path to check for changes
"""
assert isinstance(path, str)
assert isinstance(revnum, int) and revnum >= 0
- return self._get_transport().iter_log(path, revnum, revnum, 1, True, True, []).next()[1]
+
+ try:
+ return self._get_transport().iter_log(path, revnum, 0, 2, True, False, []).next()[1]
+ except SubversionException, (_, num):
+ if num == svn.core.SVN_ERR_FS_NO_SUCH_REVISION:
+ raise NoSuchRevision(branch=self,
+ revision="Revision number %d" % revnum)
+ if num == svn.core.SVN_ERR_FS_NOT_FOUND:
+ return None
+ raise
def iter_changes(self, path, revnum, limit=0):
"""Return iterator over all the revisions between revnum and 0 named path or inside path.
assert revnum >= 0
try:
- revs = self._get_transport().iter_log(path, revnum, 0, limit, True, True, [])
+ for (changed_paths, revnum, known_revprops) in self._get_transport().iter_log(path, revnum, 0, limit, True, False, []):
+ if revnum == 0 and changed_paths is None:
+ revpaths = {"": ('A', None, -1)}
+ else:
+ assert isinstance(changed_paths, dict), "invalid paths in %r:%r" % (revnum, path)
+ revpaths = struct_revpaths_to_tuples(changed_paths)
+ next = changes.find_prev_location(revpaths, path, revnum)
+ revprops = lazy_dict(known_revprops, self._get_transport().revprop_list, revnum)
+ yield (path, revpaths, revnum, revprops)
+ if next is None:
+ break
+ path = next[0]
except SubversionException, (_, num):
if num == svn.core.SVN_ERR_FS_NO_SUCH_REVISION:
raise NoSuchRevision(branch=self,
revision="Revision number %d" % revnum)
raise
-
- for (changed_paths, revnum, known_revprops) in revs:
- revpaths = {}
- for k,v in changed_paths.items():
- revpaths[k.strip("/")] = (v.action, v.copyfrom_path, v.copyfrom_rev)
- next = changes.find_prev_location(revpaths, path, revnum)
- revprops = lazy_dict(known_revprops, self._get_transport().revprop_list, revnum)
- yield (path, revpaths, revnum, revprops)
- path = next[0]
def get_revision_paths(self, revnum):
"""Obtain dictionary with all the changes in a particular revision.
# To make the existing code happy:
if revnum == 0:
return {'': ('A', None, -1)}
- return self._get_transport().iter_log("", revnum, revnum, 1, True, True, []).next()[0]
+
+ try:
+ return struct_revpaths_to_tuples(
+ self._get_transport().iter_log("", revnum, revnum, 1, True, True, []).next()[0])
+ except SubversionException, (_, num):
+ if num == svn.core.SVN_ERR_FS_NO_SUCH_REVISION:
+ raise NoSuchRevision(branch=self,
+ revision="Revision number %d" % revnum)
+ raise
def find_children(self, path, revnum):
"""Find all children of path in revnum.
:param revnum: Revision to check
"""
assert revnum >= 0
- raise NotImplementedError
+ if revnum == 0:
+ return (None, -1)
+
+ try:
+ paths = struct_revpaths_to_tuples(self._get_transport().iter_log(path, revnum, revnum, 1, True, False, []).next()[0])
+ except SubversionException, (_, num):
+ if num == svn.core.SVN_ERR_FS_NO_SUCH_REVISION:
+ raise NoSuchRevision(branch=self,
+ revision="Revision number %d" % revnum)
+ raise
+
+ if not path in paths:
+ return (None, -1)
+
+ if paths[path][2] == -1:
+ if paths[path][0] == 'A':
+ return (None, -1)
+ return (path, revnum-1)
+
+ return (paths[path][1], paths[path][2])
+
def revision_parents(self, revision_id, svn_fileprops=None, svn_revprops=None):
"""See Repository.revision_parents()."""
+ assert isinstance(revision_id, str)
(branch, revnum, mapping) = self.lookup_revision_id(revision_id)
mainline_parent = self.lhs_revision_parent(branch, revnum, mapping)
if mainline_parent is None:
else:
prev_path = p
prev_rev = self._log.find_latest_change(p,
- i-1, include_parents=True,
- include_children=True)
+ i-1, include_children=True)
assert isinstance(prev_rev, int)
ret.append((prev_path, prev_rev, False))
if c.startswith(p+"/") and c in created_branches:
del created_branches[c]
j = self._log.find_latest_change(c, i-1,
- include_parents=True,
include_children=True)
assert isinstance(j, int)
ret.append((c, j, False))
pb.update("determining branch last changes",
i, len(created_branches))
j = self._log.find_latest_change(p, to_revnum,
- include_parents=True,
include_children=True)
if j is None:
j = created_branches[p]
import os
import logwalker
+from bzrlib import debug
from tests import TestCaseWithSubversionRepository
from transport import SvnRaTransport
class TestLogWalker(TestCaseWithSubversionRepository):
def setUp(self):
super(TestLogWalker, self).setUp()
+ debug.debug_flags.add("transport")
def get_log_walker(self, transport):
return logwalker.LogWalker(transport)
walker = self.get_log_walker(transport=SvnRaTransport(repos_url))
- self.assertEqual(0, walker.find_latest_change("", 1, include_parents=True))
+ self.assertEqual(0, walker.find_latest_change("", 1))
def test_find_latest_children_root(self):
repos_url = self.make_client("a", "dc")
walker = self.get_log_walker(transport=SvnRaTransport(repos_url))
- self.assertEqual(2, walker.find_latest_change("tags/tmp/foo", 2,
- include_parents=True))
+ self.assertEqual(2, walker.find_latest_change("tags/tmp/foo", 2))
def test_find_latest_parent_just_modify(self):
repos_url = self.make_client("a", "dc")
self.client_commit("dc", "My Message3")
walker = self.get_log_walker(transport=SvnRaTransport(repos_url))
- self.assertEqual(2, walker.find_latest_change("tags/tmp/foo", 3,
- include_parents=True))
+ self.assertEqual(2, walker.find_latest_change("tags/tmp/foo", 3))
def test_find_latest_parentmoved(self):
repos_url = self.make_client("a", "dc")
walker = self.get_log_walker(transport=SvnRaTransport(repos_url))
- self.assertIs(2, walker.find_latest_change("bla/tmp", 2,
- include_parents=True))
+ self.assertIs(2, walker.find_latest_change("bla/tmp", 2))
def test_find_latest_nonexistant(self):
repos_url = self.make_client("a", "dc")
walker = self.get_log_walker(transport=SvnRaTransport(repos_url))
- self.assertIs(None, walker.find_latest_change("bloe", 2, include_parents=True))
- self.assertIs(None, walker.find_latest_change("bloe/bla", 2, include_parents=True))
+ self.assertIs(None, walker.find_latest_change("bloe", 2))
+ self.assertIs(None, walker.find_latest_change("bloe/bla", 2))
def test_find_latest_change(self):
repos_url = self.make_client("a", "dc")
walker = self.get_log_walker(transport=SvnRaTransport(repos_url))
- self.assertEqual(1, walker.find_latest_change("branches", 1, include_parents=True))
+ self.assertEqual(1, walker.find_latest_change("branches", 1))
def test_find_latest_change_children(self):
repos_url = self.make_client("a", "dc")
walker = self.get_log_walker(transport=SvnRaTransport(repos_url))
- self.assertEqual(1, walker.find_latest_change("branches", 2, include_parents=True))
+ self.assertEqual(1, walker.find_latest_change("branches", 2))
def test_find_latest_change_prop(self):
repos_url = self.make_client("a", "dc")
walker = self.get_log_walker(transport=SvnRaTransport(repos_url))
- self.assertEqual(2, walker.find_latest_change("branches", 3, include_parents=True))
+ self.assertEqual(2, walker.find_latest_change("branches", 3))
def test_find_latest_change_file(self):
repos_url = self.make_client("a", "dc")
walker = self.get_log_walker(transport=SvnRaTransport(repos_url))
- self.assertEqual(3, walker.find_latest_change("branches/foo", 3, include_parents=True))
+ self.assertEqual(3, walker.find_latest_change("branches/foo", 3))
def test_find_latest_change_newer(self):
repos_url = self.make_client("a", "dc")
walker = self.get_log_walker(transport=SvnRaTransport(repos_url))
- self.assertEqual(2, walker.find_latest_change("branches/foo", 2, include_parents=True))
+ self.assertEqual(2, walker.find_latest_change("branches/foo", 2))
def test_follow_history_branch_replace(self):
repos_url = self.make_client("a", "dc")
def iter_log(self, path, from_revnum, to_revnum, limit, discover_changed_paths,
strict_node_history, revprops):
+
+ assert isinstance(path, str)
+ assert isinstance(from_revnum, int) and isinstance(to_revnum, int)
+ assert isinstance(limit, int)
from threading import Thread, Semaphore
class logfetcher(Thread):
def next(self):
self.semaphore.acquire()
- ret = self.pending.pop()
+ ret = self.pending.pop(0)
if isinstance(ret, Exception):
raise ret
return ret