Fix #76280 and add some docstrings.
[jelmer/subvertpy.git] / logwalker.py
1 # Copyright (C) 2006 Jelmer Vernooij <jelmer@samba.org>
2
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
7
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17 from bzrlib.errors import NoSuchRevision, BzrError, NotBranchError
18 from bzrlib.progress import ProgressBar, DummyProgress
19 from bzrlib.trace import mutter
20
21 import os
22
23 from svn.core import SubversionException
24 from transport import SvnRaTransport
25 import svn.core
26
27 from bsddb import dbshelve as shelve
28
29 shelves = {}
30
31 def _escape_commit_message(message):
32     """Replace xml-incompatible control characters."""
33     import re
34     # FIXME: RBC 20060419 this should be done by the revision
35     # serialiser not by commit. Then we can also add an unescaper
36     # in the deserializer and start roundtripping revision messages
37     # precisely. See repository_implementations/test_repository.py
38     
39     # Python strings can include characters that can't be
40     # represented in well-formed XML; escape characters that
41     # aren't listed in the XML specification
42     # (http://www.w3.org/TR/REC-xml/#NT-Char).
43     message, _ = re.subn(
44         u'[^\x09\x0A\x0D\u0020-\uD7FF\uE000-\uFFFD]+',
45         lambda match: match.group(0).encode('unicode_escape'),
46         message)
47     return message
48
49
50 class NotSvnBranchPath(BzrError):
51     _fmt = """{%(branch_path)s} is not a valid Svn branch path"""
52
53     def __init__(self, branch_path):
54         BzrError.__init__(self)
55         self.branch_path = branch_path
56
57
58 class LogWalker(object):
59     """Easy way to access the history of a Subversion repository."""
60     def __init__(self, scheme, transport=None, cache_dir=None, last_revnum=None, pb=None):
61         """Create a new instance.
62
63         :param scheme:  Branching scheme to use.
64         :param transport:   SvnRaTransport to use to access the repository.
65         :param cache_dir:   Optional cache directory to use. Doesn't cache if 
66                             not set.
67         :param last_revnum: Last known revnum in the repository. Will be 
68                             determined if not specified.
69         :param pb:          Progress bar to report progress to.
70         """
71         assert isinstance(transport, SvnRaTransport)
72
73         if last_revnum is None:
74             last_revnum = transport.get_latest_revnum()
75
76         self.last_revnum = last_revnum
77
78         self.transport = transport.clone()
79         self.scheme = scheme
80
81         if not cache_dir is None:
82             cache_file = os.path.join(cache_dir, 'log-v2')
83             if not shelves.has_key(cache_file):
84                 shelves[cache_file] = shelve.open(cache_file)
85             self.revisions = shelves[cache_file]
86         else:
87             self.revisions = {}
88         self.saved_revnum = max(len(self.revisions)-1, 0)
89
90     def fetch_revisions(self, to_revnum, pb=None):
91         """Fetch information about all revisions in the remote repository
92         until to_revnum.
93
94         :param to_revnum: End of range to fetch information for
95         :param pb: Optional progress bar to use
96         """
97         def rcvr(orig_paths, rev, author, date, message, pool):
98             pb.update('fetching svn revision info', rev, to_revnum)
99             paths = {}
100             if orig_paths is None:
101                 orig_paths = {}
102             for p in orig_paths:
103                 copyfrom_path = orig_paths[p].copyfrom_path
104                 if copyfrom_path:
105                     copyfrom_path = copyfrom_path.strip("/")
106                 paths[p.strip("/")] = (orig_paths[p].action,
107                             copyfrom_path, orig_paths[p].copyfrom_rev)
108
109             self.revisions[str(rev)] = {
110                     'paths': paths,
111                     'author': author,
112                     'date': date,
113                     'message': message
114                     }
115             self.saved_revnum = rev
116
117         to_revnum = max(self.last_revnum, to_revnum)
118
119         # Don't bother for only a few revisions
120         if abs(self.saved_revnum-to_revnum) < 10:
121             pb = DummyProgress()
122         else:
123             pb = ProgressBar()
124
125         try:
126             try:
127                 mutter('getting log %r:%r' % (self.saved_revnum, to_revnum))
128                 self.transport.get_log(["/"], self.saved_revnum, to_revnum, 
129                                0, True, True, rcvr)
130             finally:
131                 pb.clear()
132         except SubversionException, (_, num):
133             if num == svn.core.SVN_ERR_FS_NO_SUCH_REVISION:
134                 raise NoSuchRevision(branch=self, 
135                     revision="Revision number %d" % to_revnum)
136             raise
137
138     def follow_history(self, branch_path, revnum):
139         """Return iterator over all the revisions between revnum and 
140         0 that touch branch_path.
141         
142         :param branch_path:   Branch path to start reporting (in revnum)
143         :param revnum:        Start revision.
144         """
145         assert revnum >= 0
146
147         if not branch_path is None and not self.scheme.is_branch(branch_path):
148             raise NotSvnBranchPath(branch_path)
149
150         if branch_path:
151             branch_path = branch_path.strip("/")
152
153         if revnum > self.saved_revnum:
154             self.fetch_revisions(revnum)
155
156         continue_revnum = None
157         for i in range(revnum+1):
158             i = revnum - i
159
160             if i == 0:
161                 continue
162
163             if not (continue_revnum is None or continue_revnum == i):
164                 continue
165
166             continue_revnum = None
167
168             rev = self.revisions[str(i)]
169             changed_paths = {}
170             for p in rev['paths']:
171                 if (branch_path is None or 
172                     p == branch_path or
173                     branch_path == "" or
174                     p.startswith(branch_path+"/")):
175
176                     try:
177                         (bp, rp) = self.scheme.unprefix(p)
178                         if not changed_paths.has_key(bp):
179                             changed_paths[bp] = {}
180                         changed_paths[bp][p] = rev['paths'][p]
181                     except NotBranchError:
182                         pass
183
184             assert branch_path is None or len(changed_paths) <= 1
185
186             for bp in changed_paths:
187                 yield (bp, changed_paths[bp], i)
188
189             if (not branch_path is None and 
190                 branch_path in rev['paths'] and 
191                 not rev['paths'][branch_path][1] is None):
192                 # In this revision, this branch was copied from 
193                 # somewhere else
194                 # FIXME: What if copyfrom_path is not a branch path?
195                 continue_revnum = rev['paths'][branch_path][2]
196                 branch_path = rev['paths'][branch_path][1]
197
198     def find_branches(self, revnum):
199         """Find all branches that were changed in the specified revision number.
200
201         :param revnum: Revision to search for branches.
202         """
203         created_branches = {}
204
205         if revnum > self.saved_revnum:
206             self.fetch_revisions(revnum)
207
208         for i in range(revnum):
209             if i == 0:
210                 continue
211             rev = self.revisions[str(i)]
212             for p in rev['paths']:
213                 if self.scheme.is_branch(p):
214                     if rev['paths'][p][0] in ('R', 'D'):
215                         del created_branches[p]
216                         yield (p, i, False)
217
218                     if rev['paths'][p][0] in ('A', 'R'): 
219                         created_branches[p] = i
220
221         for p in created_branches:
222             yield (p, i, True)
223
224     def get_revision_info(self, revnum, pb=None):
225         """Obtain basic information for a specific revision.
226
227         :param revnum: Revision number.
228         :returns: Tuple with author, log message and date of the revision.
229         """
230         if revnum > self.saved_revnum:
231             self.fetch_revisions(revnum, pb)
232         rev = self.revisions[str(revnum)]
233         if rev['author'] is None:
234             author = None
235         else:
236             author = rev['author']
237         return (author, 
238              _escape_commit_message(rev['message']), 
239              rev['date'], rev['paths'])
240
241     
242     def find_latest_change(self, path, revnum):
243         """Find latest revision that touched path.
244
245         :param path: Path to check for changes
246         :param revnum: First revision to check
247         """
248         while revnum > 0 and not self.touches_path(path, revnum):
249             revnum = revnum - 1
250         return revnum
251
252     def touches_path(self, path, revnum):
253         """Check whether path was changed in specified revision.
254
255         :param path:  Path to check
256         :param revnum:  Revision to check
257         """
258         if revnum > self.saved_revnum:
259             self.fetch_revisions(revnum)
260         return (path in self.revisions[str(revnum)]['paths'])
261
262     def find_children(self, path, revnum):
263         """Find all children of path in revnum."""
264         # TODO: Find children by walking history, or use 
265         # cache?
266         mutter("svn ls -r %d '%r'" % (revnum, path))
267
268         try:
269             (dirents, _, _) = self.transport.get_dir(
270                 "/" + path.encode('utf8'), revnum)
271         except SubversionException, (_, num):
272             if num == svn.core.SVN_ERR_FS_NOT_DIRECTORY:
273                 return
274             raise
275
276         for p in dirents:
277             yield os.path.join(path, p)
278             for c in self.find_children(os.path.join(path, p), revnum):
279                 yield c