Fix test.
[jelmer/subvertpy.git] / logwalker.py
1 # Copyright (C) 2006 Jelmer Vernooij <jelmer@samba.org>
2
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
7
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17 from bzrlib.errors import NoSuchRevision, BzrError, NotBranchError
18 from bzrlib.progress import ProgressBar, DummyProgress
19 from bzrlib.trace import mutter
20
21 import os
22
23 from svn.core import SubversionException
24 from transport import SvnRaTransport
25 import svn.core
26
27 from bsddb import dbshelve as shelve
28
29 shelves = {}
30
31 def _escape_commit_message(message):
32     """Replace xml-incompatible control characters."""
33     import re
34     # FIXME: RBC 20060419 this should be done by the revision
35     # serialiser not by commit. Then we can also add an unescaper
36     # in the deserializer and start roundtripping revision messages
37     # precisely. See repository_implementations/test_repository.py
38     
39     # Python strings can include characters that can't be
40     # represented in well-formed XML; escape characters that
41     # aren't listed in the XML specification
42     # (http://www.w3.org/TR/REC-xml/#NT-Char).
43     message, _ = re.subn(
44         u'[^\x09\x0A\x0D\u0020-\uD7FF\uE000-\uFFFD]+',
45         lambda match: match.group(0).encode('unicode_escape'),
46         message)
47     return message
48
49
50 class NotSvnBranchPath(BzrError):
51     _fmt = """{%(branch_path)s} is not a valid Svn branch path"""
52
53     def __init__(self, branch_path):
54         BzrError.__init__(self)
55         self.branch_path = branch_path
56
57
58 class LogWalker(object):
59     """Easy way to access the history of a Subversion repository."""
60     def __init__(self, scheme, transport=None, cache_dir=None, last_revnum=None, pb=None):
61         """Create a new instance.
62
63         :param scheme:  Branching scheme to use.
64         :param transport:   SvnRaTransport to use to access the repository.
65         :param cache_dir:   Optional cache directory to use. Doesn't cache if 
66                             not set.
67         :param last_revnum: Last known revnum in the repository. Will be 
68                             determined if not specified.
69         :param pb:          Progress bar to report progress to.
70         """
71         assert isinstance(transport, SvnRaTransport)
72
73         if last_revnum is None:
74             last_revnum = transport.get_latest_revnum()
75
76         self.last_revnum = last_revnum
77
78         self.transport = transport.clone()
79         self.scheme = scheme
80
81         if not cache_dir is None:
82             cache_file = os.path.join(cache_dir, 'log-v2')
83             if not shelves.has_key(cache_file):
84                 shelves[cache_file] = shelve.open(cache_file)
85             self.revisions = shelves[cache_file]
86         else:
87             self.revisions = {}
88         self.saved_revnum = max(len(self.revisions)-1, 0)
89
90     def fetch_revisions(self, to_revnum, pb=None):
91         """Fetch information about all revisions in the remote repository
92         until to_revnum.
93
94         :param to_revnum: End of range to fetch information for
95         :param pb: Optional progress bar to use
96         """
97         def rcvr(orig_paths, rev, author, date, message, pool):
98             pb.update('fetching svn revision info', rev, to_revnum)
99             paths = {}
100             if orig_paths is None:
101                 orig_paths = {}
102             for p in orig_paths:
103                 copyfrom_path = orig_paths[p].copyfrom_path
104                 if copyfrom_path:
105                     copyfrom_path = copyfrom_path.strip("/")
106                 paths[p.strip("/")] = (orig_paths[p].action,
107                             copyfrom_path, orig_paths[p].copyfrom_rev)
108
109             self.revisions[str(rev)] = {
110                     'paths': paths,
111                     'author': author,
112                     'date': date,
113                     'message': message
114                     }
115             self.saved_revnum = rev
116
117         to_revnum = max(self.last_revnum, to_revnum)
118
119         # Don't bother for only a few revisions
120         if abs(self.saved_revnum-to_revnum) < 10:
121             pb = DummyProgress()
122         else:
123             pb = ProgressBar()
124
125         try:
126             try:
127                 mutter('getting log %r:%r' % (self.saved_revnum, to_revnum))
128                 self.transport.get_log(["/"], self.saved_revnum, to_revnum, 
129                                0, True, True, rcvr)
130             finally:
131                 pb.clear()
132         except SubversionException, (_, num):
133             if num == svn.core.SVN_ERR_FS_NO_SUCH_REVISION:
134                 raise NoSuchRevision(branch=self, 
135                     revision="Revision number %d" % to_revnum)
136             raise
137
138     def follow_history(self, branch_path, revnum):
139         """Return iterator over all the revisions between revnum and 
140         0 that touch branch_path.
141         
142         :param branch_path:   Branch path to start reporting (in revnum)
143         :param revnum:        Start revision.
144         """
145         assert revnum >= 0
146
147         if revnum == 0 and branch_path in (None, ""):
148             return
149
150         if not branch_path is None and not self.scheme.is_branch(branch_path):
151             raise NotSvnBranchPath(branch_path)
152
153         if branch_path:
154             branch_path = branch_path.strip("/")
155
156         if revnum > self.saved_revnum:
157             self.fetch_revisions(revnum)
158
159         continue_revnum = None
160         for i in range(revnum+1):
161             i = revnum - i
162
163             if i == 0:
164                 continue
165
166             if not (continue_revnum is None or continue_revnum == i):
167                 continue
168
169             continue_revnum = None
170
171             rev = self.revisions[str(i)]
172             changed_paths = {}
173             for p in rev['paths']:
174                 if (branch_path is None or 
175                     p == branch_path or
176                     branch_path == "" or
177                     p.startswith(branch_path+"/")):
178
179                     try:
180                         (bp, rp) = self.scheme.unprefix(p)
181                         if not changed_paths.has_key(bp):
182                             changed_paths[bp] = {}
183                         changed_paths[bp][p] = rev['paths'][p]
184                     except NotBranchError:
185                         pass
186
187             assert branch_path is None or len(changed_paths) <= 1
188
189             for bp in changed_paths:
190                 yield (bp, changed_paths[bp], i)
191
192             if (not branch_path is None and 
193                 branch_path in rev['paths'] and 
194                 not rev['paths'][branch_path][1] is None):
195                 # In this revision, this branch was copied from 
196                 # somewhere else
197                 # FIXME: What if copyfrom_path is not a branch path?
198                 continue_revnum = rev['paths'][branch_path][2]
199                 branch_path = rev['paths'][branch_path][1]
200
201     def find_branches(self, revnum):
202         """Find all branches that were changed in the specified revision number.
203
204         :param revnum: Revision to search for branches.
205         """
206         created_branches = {}
207
208         if revnum > self.saved_revnum:
209             self.fetch_revisions(revnum)
210
211         for i in range(revnum+1):
212             if i == 0:
213                 paths = {'': ('A', None, None)}
214             else:
215                 paths = self.revisions[str(i)]['paths']
216             for p in paths:
217                 if self.scheme.is_branch(p):
218                     if paths[p][0] in ('R', 'D'):
219                         del created_branches[p]
220                         yield (p, i, False)
221
222                     if paths[p][0] in ('A', 'R'): 
223                         created_branches[p] = i
224
225         for p in created_branches:
226             yield (p, i, True)
227
228     def get_revision_info(self, revnum, pb=None):
229         """Obtain basic information for a specific revision.
230
231         :param revnum: Revision number.
232         :returns: Tuple with author, log message and date of the revision.
233         """
234         if revnum > self.saved_revnum:
235             self.fetch_revisions(revnum, pb)
236         rev = self.revisions[str(revnum)]
237         if rev['author'] is None:
238             author = None
239         else:
240             author = rev['author']
241         return (author, 
242              _escape_commit_message(rev['message']), 
243              rev['date'], rev['paths'])
244
245     
246     def find_latest_change(self, path, revnum):
247         """Find latest revision that touched path.
248
249         :param path: Path to check for changes
250         :param revnum: First revision to check
251         """
252         while revnum > 0 and not self.touches_path(path, revnum):
253             revnum = revnum - 1
254         return revnum
255
256     def touches_path(self, path, revnum):
257         """Check whether path was changed in specified revision.
258
259         :param path:  Path to check
260         :param revnum:  Revision to check
261         """
262         if revnum > self.saved_revnum:
263             self.fetch_revisions(revnum)
264         return (path in self.revisions[str(revnum)]['paths'])
265
266     def find_children(self, path, revnum):
267         """Find all children of path in revnum."""
268         # TODO: Find children by walking history, or use 
269         # cache?
270         mutter("svn ls -r %d '%r'" % (revnum, path))
271
272         try:
273             (dirents, _, _) = self.transport.get_dir(
274                 "/" + path.encode('utf8'), revnum)
275         except SubversionException, (_, num):
276             if num == svn.core.SVN_ERR_FS_NOT_DIRECTORY:
277                 return
278             raise
279
280         for p in dirents:
281             yield os.path.join(path, p)
282             for c in self.find_children(os.path.join(path, p), revnum):
283                 yield c