1 # Copyright (C) 2006 Jelmer Vernooij <jelmer@samba.org>
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 from bzrlib.errors import NoSuchRevision, BzrError, NotBranchError
18 from bzrlib.progress import ProgressBar, DummyProgress
19 from bzrlib.trace import mutter
23 from svn.core import SubversionException
24 from transport import SvnRaTransport
27 from bsddb import dbshelve as shelve
31 def _escape_commit_message(message):
32 """Replace xml-incompatible control characters."""
34 # FIXME: RBC 20060419 this should be done by the revision
35 # serialiser not by commit. Then we can also add an unescaper
36 # in the deserializer and start roundtripping revision messages
37 # precisely. See repository_implementations/test_repository.py
39 # Python strings can include characters that can't be
40 # represented in well-formed XML; escape characters that
41 # aren't listed in the XML specification
42 # (http://www.w3.org/TR/REC-xml/#NT-Char).
44 u'[^\x09\x0A\x0D\u0020-\uD7FF\uE000-\uFFFD]+',
45 lambda match: match.group(0).encode('unicode_escape'),
50 class NotSvnBranchPath(BzrError):
51 _fmt = """{%(branch_path)s} is not a valid Svn branch path"""
53 def __init__(self, branch_path):
54 BzrError.__init__(self)
55 self.branch_path = branch_path
58 class LogWalker(object):
59 """Easy way to access the history of a Subversion repository."""
60 def __init__(self, scheme, transport=None, cache_dir=None, last_revnum=None, pb=None):
61 """Create a new instance.
63 :param scheme: Branching scheme to use.
64 :param transport: SvnRaTransport to use to access the repository.
65 :param cache_dir: Optional cache directory to use. Doesn't cache if
67 :param last_revnum: Last known revnum in the repository. Will be
68 determined if not specified.
69 :param pb: Progress bar to report progress to.
71 assert isinstance(transport, SvnRaTransport)
73 if last_revnum is None:
74 last_revnum = transport.get_latest_revnum()
76 self.last_revnum = last_revnum
78 self.transport = transport.clone()
81 if not cache_dir is None:
82 cache_file = os.path.join(cache_dir, 'log-v2')
83 if not shelves.has_key(cache_file):
84 shelves[cache_file] = shelve.open(cache_file)
85 self.revisions = shelves[cache_file]
88 self.saved_revnum = max(len(self.revisions)-1, 0)
90 def fetch_revisions(self, to_revnum, pb=None):
91 """Fetch information about all revisions in the remote repository
94 :param to_revnum: End of range to fetch information for
95 :param pb: Optional progress bar to use
97 def rcvr(orig_paths, rev, author, date, message, pool):
98 pb.update('fetching svn revision info', rev, to_revnum)
100 if orig_paths is None:
103 copyfrom_path = orig_paths[p].copyfrom_path
105 copyfrom_path = copyfrom_path.strip("/")
106 paths[p.strip("/")] = (orig_paths[p].action,
107 copyfrom_path, orig_paths[p].copyfrom_rev)
109 self.revisions[str(rev)] = {
115 self.saved_revnum = rev
117 to_revnum = max(self.last_revnum, to_revnum)
119 # Don't bother for only a few revisions
120 if abs(self.saved_revnum-to_revnum) < 10:
127 mutter('getting log %r:%r' % (self.saved_revnum, to_revnum))
128 self.transport.get_log(["/"], self.saved_revnum, to_revnum,
132 except SubversionException, (_, num):
133 if num == svn.core.SVN_ERR_FS_NO_SUCH_REVISION:
134 raise NoSuchRevision(branch=self,
135 revision="Revision number %d" % to_revnum)
138 def follow_history(self, branch_path, revnum):
139 """Return iterator over all the revisions between revnum and
140 0 that touch branch_path.
142 :param branch_path: Branch path to start reporting (in revnum)
143 :param revnum: Start revision.
147 if revnum == 0 and branch_path in (None, ""):
150 if not branch_path is None and not self.scheme.is_branch(branch_path):
151 raise NotSvnBranchPath(branch_path)
154 branch_path = branch_path.strip("/")
156 if revnum > self.saved_revnum:
157 self.fetch_revisions(revnum)
159 continue_revnum = None
160 for i in range(revnum+1):
166 if not (continue_revnum is None or continue_revnum == i):
169 continue_revnum = None
171 rev = self.revisions[str(i)]
173 for p in rev['paths']:
174 if (branch_path is None or
177 p.startswith(branch_path+"/")):
180 (bp, rp) = self.scheme.unprefix(p)
181 if not changed_paths.has_key(bp):
182 changed_paths[bp] = {}
183 changed_paths[bp][p] = rev['paths'][p]
184 except NotBranchError:
187 assert branch_path is None or len(changed_paths) <= 1
189 for bp in changed_paths:
190 yield (bp, changed_paths[bp], i)
192 if (not branch_path is None and
193 branch_path in rev['paths'] and
194 not rev['paths'][branch_path][1] is None):
195 # In this revision, this branch was copied from
197 # FIXME: What if copyfrom_path is not a branch path?
198 continue_revnum = rev['paths'][branch_path][2]
199 branch_path = rev['paths'][branch_path][1]
201 def find_branches(self, revnum):
202 """Find all branches that were changed in the specified revision number.
204 :param revnum: Revision to search for branches.
206 created_branches = {}
208 if revnum > self.saved_revnum:
209 self.fetch_revisions(revnum)
211 for i in range(revnum+1):
213 paths = {'': ('A', None, None)}
215 paths = self.revisions[str(i)]['paths']
217 if self.scheme.is_branch(p):
218 if paths[p][0] in ('R', 'D'):
219 del created_branches[p]
222 if paths[p][0] in ('A', 'R'):
223 created_branches[p] = i
225 for p in created_branches:
228 def get_revision_info(self, revnum, pb=None):
229 """Obtain basic information for a specific revision.
231 :param revnum: Revision number.
232 :returns: Tuple with author, log message and date of the revision.
234 if revnum > self.saved_revnum:
235 self.fetch_revisions(revnum, pb)
236 rev = self.revisions[str(revnum)]
237 if rev['author'] is None:
240 author = rev['author']
242 _escape_commit_message(rev['message']),
243 rev['date'], rev['paths'])
246 def find_latest_change(self, path, revnum):
247 """Find latest revision that touched path.
249 :param path: Path to check for changes
250 :param revnum: First revision to check
252 while revnum > 0 and not self.touches_path(path, revnum):
256 def touches_path(self, path, revnum):
257 """Check whether path was changed in specified revision.
259 :param path: Path to check
260 :param revnum: Revision to check
262 if revnum > self.saved_revnum:
263 self.fetch_revisions(revnum)
264 return (path in self.revisions[str(revnum)]['paths'])
266 def find_children(self, path, revnum):
267 """Find all children of path in revnum."""
268 # TODO: Find children by walking history, or use
270 mutter("svn ls -r %d '%r'" % (revnum, path))
273 (dirents, _, _) = self.transport.get_dir(
274 "/" + path.encode('utf8'), revnum)
275 except SubversionException, (_, num):
276 if num == svn.core.SVN_ERR_FS_NOT_DIRECTORY:
281 yield os.path.join(path, p)
282 for c in self.find_children(os.path.join(path, p), revnum):