1 # Copyright (C) 2006-2007 Jelmer Vernooij <jelmer@samba.org>
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 from bzrlib.errors import (InvalidRevisionId, NoSuchRevision)
20 REVISION_ID_PREFIX = "svn-v%d-" % MAPPING_VERSION
24 def escape_svn_path(x):
25 if isinstance(x, unicode):
27 return urllib.quote(x, "")
28 unescape_svn_path = urllib.unquote
31 def parse_svn_revision_id(revid):
32 """Parse an existing Subversion-based revision id.
34 :param revid: The revision id.
35 :raises: InvalidRevisionId
36 :return: Tuple with uuid, branch path, revision number and scheme.
39 assert revid is not None
40 assert isinstance(revid, basestring)
42 if not revid.startswith(REVISION_ID_PREFIX):
43 raise InvalidRevisionId(revid, "")
46 (version, uuid, branch_path, srevnum) = revid.split(":")
48 raise InvalidRevisionId(revid, "")
50 if not version.startswith(REVISION_ID_PREFIX):
51 raise InvalidRevisionId(revid, "")
53 scheme = version[len(REVISION_ID_PREFIX):]
55 return (uuid, unescape_svn_path(branch_path), int(srevnum), scheme)
58 def generate_svn_revision_id(uuid, revnum, path, scheme):
59 """Generate a unambiguous revision id.
61 :param uuid: UUID of the repository.
62 :param revnum: Subversion revision number.
63 :param path: Branch path.
64 :param scheme: Name of the branching scheme in use
66 :return: New revision id.
68 assert isinstance(revnum, int)
69 assert isinstance(path, basestring)
71 assert revnum > 0 or path == "", \
72 "Trying to generate revid for (%r,%r)" % (path, revnum)
73 return "%s%s:%s:%s:%d" % (REVISION_ID_PREFIX, scheme, uuid, \
74 escape_svn_path(path.strip("/")), revnum)
77 class RevidMap(object):
78 """Revision id mapping store.
80 Stores mapping from revid -> (path, revnum, scheme)
82 def __init__(self, cache_db=None):
84 from cache import sqlite3
85 self.cachedb = sqlite3.connect(":memory:")
87 self.cachedb = cache_db
88 self.cachedb.executescript("""
89 create table if not exists revmap (revid text, path text, min_revnum integer, max_revnum integer, scheme text);
90 create index if not exists revid on revmap (revid);
91 create unique index if not exists revid_path_scheme on revmap (revid, path, scheme);
92 create index if not exists lookup_branch_revnum on revmap (max_revnum, min_revnum, path, scheme);
93 create table if not exists revno_cache (revid text unique, dist_to_origin integer);
94 create index if not exists revid on revno_cache (revid);
98 def lookup_revid(self, revid):
99 ret = self.cachedb.execute(
100 "select path, min_revnum, max_revnum, scheme from revmap where revid='%s'" % revid).fetchone()
102 raise NoSuchRevision(self, revid)
103 return (str(ret[0]), ret[1], ret[2], ret[3])
105 def lookup_branch_revnum(self, revnum, path, scheme):
106 """Lookup a revision by revision number, branch path and branching scheme.
108 :param revnum: Subversion revision number.
109 :param path: Subversion branch path.
110 :param scheme: Branching scheme name
112 assert isinstance(revnum, int)
113 assert isinstance(path, basestring)
114 assert isinstance(scheme, basestring)
115 revid = self.cachedb.execute(
116 "select revid from revmap where max_revnum = min_revnum and min_revnum='%s' and path='%s' and scheme='%s'" % (revnum, path, scheme)).fetchone()
117 if revid is not None:
121 def insert_revid(self, revid, branch, min_revnum, max_revnum, scheme,
122 dist_to_origin=None):
123 assert revid is not None and revid != ""
124 assert isinstance(scheme, basestring)
125 cursor = self.cachedb.execute(
126 "update revmap set min_revnum = MAX(min_revnum,?), max_revnum = MIN(max_revnum, ?) WHERE revid=? AND path=? AND scheme=?",
127 (min_revnum, max_revnum, revid, branch, scheme))
128 if cursor.rowcount == 0:
129 self.cachedb.execute(
130 "insert into revmap (revid,path,min_revnum,max_revnum,scheme) VALUES (?,?,?,?,?)",
131 (revid, branch, min_revnum, max_revnum, scheme))
132 if dist_to_origin is not None:
133 self.cachedb.execute(
134 "replace into revno_cache (revid,dist_to_origin) VALUES (?,?)",
135 (revid, dist_to_origin))
137 def lookup_dist_to_origin(self, revid):
138 revno = self.cachedb.execute(
139 "select dist_to_origin from revno_cache where revid='%s'" % revid).fetchone()
140 if revno is not None and revno[0] is not None:
144 def insert_revision_history(self, revhistory):
146 for revid in revhistory:
147 self.cachedb.execute(
148 "replace into revno_cache (revid,dist_to_origin) VALUES (?,?)",
151 self.cachedb.commit()