-# Copyright (C) 2005-2006 Jelmer Vernooij <jelmer@samba.org>
+# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""Checkouts and working trees (working copies)."""
-from binascii import hexlify
+from bzrlib.branch import PullResult
from bzrlib.bzrdir import BzrDirFormat, BzrDir
-from bzrlib.delta import compare_trees
-from bzrlib.errors import NotBranchError, NoSuchFile
-from bzrlib.inventory import (Inventory, InventoryDirectory, InventoryFile,
- ROOT_ID)
+from bzrlib.errors import (InvalidRevisionId, NotBranchError, NoSuchFile,
+ NoRepositoryPresent, BzrError)
+from bzrlib.inventory import Inventory, InventoryFile, InventoryLink
from bzrlib.lockable_files import TransportLock, LockableFiles
from bzrlib.lockdir import LockDir
-from bzrlib.osutils import rand_bytes, fingerprint_file
-from bzrlib.progress import DummyProgress
+from bzrlib.osutils import fingerprint_file
from bzrlib.revision import NULL_REVISION
from bzrlib.trace import mutter
-from bzrlib.tree import EmptyTree
+from bzrlib.tree import RevisionTree
+from bzrlib.transport.local import LocalTransport
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat
from branch import SvnBranch
-from repository import SvnRepository, escape_svn_path, SVN_PROP_BZR_MERGE
+from convert import SvnConverter
+from repository import (SvnRepository, SVN_PROP_BZR_MERGE,
+ SVN_PROP_SVK_MERGE, SVN_PROP_BZR_FILEIDS,
+ revision_id_to_svk_feature)
+from revids import escape_svn_path
from scheme import BranchingScheme
-from transport import (SvnRaTransport, svn_config,
- svn_to_bzr_url)
+from transport import (SvnRaTransport, svn_config, bzr_to_svn_url,
+ _create_auth_baton)
from tree import SvnBasisTree
+from copy import copy
import os
+import urllib
import svn.core, svn.wc
-from svn.core import SubversionException
+from svn.core import SubversionException, Pool
+
+from errors import NoCheckoutSupport
+
+class WorkingTreeInconsistent(BzrError):
+ _fmt = """Working copy is in inconsistent state (%(min_revnum)d:%(max_revnum)d)"""
+
+ def __init__(self, min_revnum, max_revnum):
+ self.min_revnum = min_revnum
+ self.max_revnum = max_revnum
+
class SvnWorkingTree(WorkingTree):
- """Implementation of WorkingTree that uses a Subversion
- Working Copy for storage."""
+ """WorkingTree implementation that uses a Subversion Working Copy for storage."""
def __init__(self, bzrdir, local_path, branch):
self._format = SvnWorkingTreeFormat()
self.basedir = local_path
self.bzrdir = bzrdir
self._branch = branch
self.base_revnum = 0
+ self.pool = Pool()
self.client_ctx = svn.client.create_context()
- self.client_ctx.log_msg_func2 = svn.client.svn_swig_py_get_commit_log_func
- self.client_ctx.log_msg_baton2 = self.log_message_func
-
- self._set_inventory(self.read_working_inventory())
- mutter('working inv: %r' % self.read_working_inventory().entries())
-
+ self.client_ctx.config = svn_config
+ self.client_ctx.log_msg_func2 = \
+ svn.client.svn_swig_py_get_commit_log_func
+ self.client_ctx.auth_baton = _create_auth_baton(self.pool)
+
+ self._get_wc()
+ status = svn.wc.revision_status(self.basedir, None, True, None, None)
+ if status.min_rev != status.max_rev:
+ #raise WorkingTreeInconsistent(status.min_rev, status.max_rev)
+ rev = svn.core.svn_opt_revision_t()
+ rev.kind = svn.core.svn_opt_revision_number
+ rev.value.number = status.max_rev
+ assert status.max_rev == svn.client.update(self.basedir, rev,
+ True, self.client_ctx, Pool())
+
+ self.base_revnum = status.max_rev
+ self.base_tree = SvnBasisTree(self)
self.base_revid = branch.repository.generate_revision_id(
self.base_revnum, branch.branch_path)
- mutter('basis inv: %r' % self.basis_tree().inventory.entries())
- self.controldir = os.path.join(self.basedir, svn.wc.get_adm_dir(), 'bzr')
+
+ self.read_working_inventory()
+
+ self.controldir = os.path.join(self.basedir, svn.wc.get_adm_dir(),
+ 'bzr')
try:
os.makedirs(self.controldir)
os.makedirs(os.path.join(self.controldir, 'lock'))
except OSError:
pass
- control_transport = bzrdir.transport.clone(os.path.join(svn.wc.get_adm_dir(), 'bzr'))
+ control_transport = bzrdir.transport.clone(os.path.join(
+ svn.wc.get_adm_dir(), 'bzr'))
self._control_files = LockableFiles(control_transport, 'lock', LockDir)
def lock_write(self):
pass
def get_ignore_list(self):
- ignores = []
+ ignores = [svn.wc.get_adm_dir()] + \
+ svn.wc.get_default_ignores(svn_config)
def dir_add(wc, prefix):
- ignores.append(os.path.join(prefix, svn.wc.get_adm_dir()))
- for pat in svn.wc.get_ignores(svn_config, wc):
- ignores.append(os.path.join(prefix, pat))
+ ignorestr = svn.wc.prop_get(svn.core.SVN_PROP_IGNORE,
+ self.abspath(prefix).rstrip("/"), wc)
+ if ignorestr is not None:
+ for pat in ignorestr.splitlines():
+ ignores.append("./"+os.path.join(prefix, pat))
entries = svn.wc.entries_read(wc, False)
for entry in entries:
subprefix = os.path.join(prefix, entry)
- subwc = svn.wc.adm_open3(wc, self.abspath(subprefix), False, 0, None)
+ subwc = svn.wc.adm_open3(wc, self.abspath(subprefix), False,
+ 0, None)
try:
dir_add(subwc, subprefix)
finally:
def _write_inventory(self, inv):
pass
- def is_ignored(self, filename):
- if svn.wc.is_adm_dir(os.path.basename(filename)):
- return True
-
- (wc, name) = self._get_rel_wc(filename)
- assert wc
- try:
- ignores = svn.wc.get_ignores(svn_config, wc)
- from fnmatch import fnmatch
- for pattern in ignores:
- if fnmatch(name, pattern):
- return True
- return False
- finally:
- svn.wc.adm_close(wc)
-
def is_control_filename(self, path):
return svn.wc.is_adm_dir(path)
def remove(self, files, verbose=False, to_file=None):
+ # FIXME: Use to_file argument
+ # FIXME: Use verbose argument
+ assert isinstance(files, list)
wc = self._get_wc(write_lock=True)
try:
for file in files:
finally:
svn.wc.adm_close(wc)
+ for file in files:
+ self._change_fileid_mapping(None, file)
+ self.read_working_inventory()
+
def _get_wc(self, relpath="", write_lock=False):
- return svn.wc.adm_open3(None, self.abspath(relpath).rstrip("/"), write_lock, 0, None)
+ return svn.wc.adm_open3(None, self.abspath(relpath).rstrip("/"),
+ write_lock, 0, None)
def _get_rel_wc(self, relpath, write_lock=False):
dir = os.path.dirname(relpath)
file = os.path.basename(relpath)
return (self._get_wc(dir, write_lock), file)
- def move(self, from_paths, to_name):
+ def move(self, from_paths, to_dir=None, after=False, **kwargs):
+ # FIXME: Use after argument
revt = svn.core.svn_opt_revision_t()
revt.kind = svn.core.svn_opt_revision_working
- to_wc = self._get_wc(to_name, write_lock=True)
- try:
- for entry in from_paths:
- svn.wc.copy(self.abspath(entry), to_wc, os.path.basename(entry), None, None)
- finally:
- svn.wc.adm_close(to_wc)
-
for entry in from_paths:
- self.remove([entry])
-
- def rename_one(self, from_rel, to_rel):
+ try:
+ to_wc = self._get_wc(to_dir, write_lock=True)
+ svn.wc.copy(self.abspath(entry), to_wc,
+ os.path.basename(entry), None, None)
+ finally:
+ svn.wc.adm_close(to_wc)
+ try:
+ from_wc = self._get_wc(write_lock=True)
+ svn.wc.delete2(self.abspath(entry), from_wc, None, None, None)
+ finally:
+ svn.wc.adm_close(from_wc)
+ new_name = "%s/%s" % (to_dir, os.path.basename(entry))
+ self._change_fileid_mapping(self.inventory.path2id(entry), new_name)
+ self._change_fileid_mapping(None, entry)
+
+ self.read_working_inventory()
+
+ def rename_one(self, from_rel, to_rel, after=False):
+ # FIXME: Use after
revt = svn.core.svn_opt_revision_t()
revt.kind = svn.core.svn_opt_revision_unspecified
(to_wc, to_file) = self._get_rel_wc(to_rel, write_lock=True)
+ from_id = self.inventory.path2id(from_rel)
try:
svn.wc.copy(self.abspath(from_rel), to_wc, to_file, None, None)
svn.wc.delete2(self.abspath(from_rel), to_wc, None, None, None)
finally:
svn.wc.adm_close(to_wc)
+ self._change_fileid_mapping(None, from_rel)
+ self._change_fileid_mapping(from_id, to_rel)
+ self.read_working_inventory()
+
+ def path_to_file_id(self, revnum, current_revnum, path):
+ """Generate a bzr file id from a Subversion file name.
+
+ :param revnum: Revision number.
+ :param path: Absolute path.
+ :return: Tuple with file id and revision id.
+ """
+ assert isinstance(revnum, int) and revnum >= 0
+ assert isinstance(path, basestring)
+
+ (_, rp) = self.branch.repository.scheme.unprefix(path)
+ entry = self.base_tree.id_map[rp]
+ assert entry[0] is not None
+ assert isinstance(entry[0], str), "fileid %r for %r is not a string" % (entry[0], path)
+ return entry
def read_working_inventory(self):
inv = Inventory()
def add_file_to_inv(relpath, id, revid, parent_id):
"""Add a file to the inventory."""
- file = InventoryFile(id, os.path.basename(relpath), parent_id)
- file.revision = revid
- try:
- data = fingerprint_file(open(self.abspath(relpath)))
- file.text_sha1 = data['sha1']
- file.text_size = data['size']
- file.executable = self.is_executable(id, relpath)
+ if os.path.islink(self.abspath(relpath)):
+ file = InventoryLink(id, os.path.basename(relpath), parent_id)
+ file.revision = revid
+ file.symlink_target = os.readlink(self.abspath(relpath))
+ file.text_sha1 = None
+ file.text_size = None
+ file.executable = False
inv.add(file)
- except IOError:
- # Ignore non-existing files
- pass
+ else:
+ file = InventoryFile(id, os.path.basename(relpath), parent_id)
+ file.revision = revid
+ try:
+ data = fingerprint_file(open(self.abspath(relpath)))
+ file.text_sha1 = data['sha1']
+ file.text_size = data['size']
+ file.executable = self.is_executable(id, relpath)
+ inv.add(file)
+ except IOError:
+ # Ignore non-existing files
+ pass
def find_copies(url, relpath=""):
wc = self._get_wc(relpath)
find_copies(subrelpath)
svn.wc.adm_close(wc)
- def find_ids(entry):
- relpath = entry.url[len(entry.repos):].strip("/")
+ def find_ids(entry, rootwc):
+ relpath = urllib.unquote(entry.url[len(entry.repos):].strip("/"))
+ assert entry.schedule in (svn.wc.schedule_normal,
+ svn.wc.schedule_delete,
+ svn.wc.schedule_add,
+ svn.wc.schedule_replace)
if entry.schedule == svn.wc.schedule_normal:
assert entry.revision >= 0
# Keep old id
- mutter('stay: %r' % relpath)
- return self.branch.repository.path_to_file_id(entry.revision,
+ return self.path_to_file_id(entry.cmt_rev, entry.revision,
relpath)
elif entry.schedule == svn.wc.schedule_delete:
return (None, None)
entry.schedule == svn.wc.schedule_replace):
# See if the file this file was copied from disappeared
# and has no other copies -> in that case, take id of other file
- mutter('copies(%r): %r' % (relpath, list(find_copies(entry.copyfrom_url))))
- if entry.copyfrom_url and list(find_copies(entry.copyfrom_url)) == [relpath]:
- return self.branch.repository.path_to_file_id(entry.copyfrom_rev,
- entry.copyfrom_url[len(entry.repos):])
+ if (entry.copyfrom_url and
+ list(find_copies(entry.copyfrom_url)) == [relpath]):
+ return self.path_to_file_id(entry.copyfrom_rev,
+ entry.revision, entry.copyfrom_url[len(entry.repos):])
+ ids = self._get_new_file_ids(rootwc)
+ if ids.has_key(relpath):
+ return (ids[relpath], None)
return ("NEW-" + escape_svn_path(entry.url[len(entry.repos):].strip("/")), None)
- else:
- assert 0
def add_dir_to_inv(relpath, wc, parent_id):
entries = svn.wc.entries_read(wc, False)
-
entry = entries[""]
-
- (id, revid) = find_ids(entry)
-
+ assert parent_id is None or isinstance(parent_id, str), \
+ "%r is not a string" % parent_id
+ (id, revid) = find_ids(entry, rootwc)
if id is None:
+ mutter('no id for %r' % entry.url)
return
-
- self.base_revnum = max(self.base_revnum, entry.revision)
+ assert revid is None or isinstance(revid, str), "%r is not a string" % revid
+ assert isinstance(id, str), "%r is not a string" % id
# First handle directory itself
- if id is ROOT_ID:
+ inv.add_path(relpath, 'directory', id, parent_id).revision = revid
+ if relpath == "":
inv.revision_id = revid
- else:
- inventry = InventoryDirectory(id, os.path.basename(relpath), parent_id)
- inventry.revision = revid
- inv.add(inventry)
for name in entries:
if name == "":
finally:
svn.wc.adm_close(subwc)
else:
- (subid, subrevid) = find_ids(entry)
+ (subid, subrevid) = find_ids(entry, rootwc)
if subid:
- self.base_revnum = max(self.base_revnum, entry.revision)
add_file_to_inv(subrelpath, subid, subrevid, id)
+ else:
+ mutter('no id for %r' % entry.url)
- wc = self._get_wc()
+ rootwc = self._get_wc()
try:
- add_dir_to_inv("", wc, None)
+ add_dir_to_inv("", rootwc, None)
finally:
- svn.wc.adm_close(wc)
+ svn.wc.adm_close(rootwc)
+ self._set_inventory(inv, dirty=False)
return inv
def set_last_revision(self, revid):
mutter('setting last revision to %r' % revid)
if revid is None or revid == NULL_REVISION:
self.base_revid = revid
+ self.base_revnum = 0
+ self.base_tree = RevisionTree(self, Inventory(), revid)
return
+ rev = self.branch.lookup_revision_id(revid)
+ self.base_revnum = rev
+ self.base_revid = revid
+ self.base_tree = SvnBasisTree(self)
+
# TODO: Implement more efficient version
newrev = self.branch.repository.get_revision(revid)
newrevtree = self.branch.repository.revision_tree(revid)
def update_settings(wc, path):
id = newrevtree.inventory.path2id(path)
mutter("Updating settings for %r" % id)
- (_, revnum) = self.branch.repository.parse_revision_id(
+ revnum = self.branch.lookup_revision_id(
newrevtree.inventory[id].revision)
svn.wc.process_committed2(self.abspath(path).rstrip("/"), wc,
svn.wc.adm_close(wc)
self.base_revid = revid
-
- def log_message_func(self, items, pool):
- """ Simple log message provider for unit tests. """
- return self._message
-
- def commit(self, message=None, revprops=None, timestamp=None, timezone=None, committer=None, rev_id=None, allow_pointless=True,
- strict=False, verbose=False, local=False, reporter=None, config=None, specific_files=None):
+ def commit(self, message=None, message_callback=None, revprops=None,
+ timestamp=None, timezone=None, committer=None, rev_id=None,
+ allow_pointless=True, strict=False, verbose=False, local=False,
+ reporter=None, config=None, specific_files=None):
+ # FIXME: Use allow_pointless
+ # FIXME: Use committer
+ # FIXME: Use verbose
+ # FIXME: Use reporter
+ # FIXME: Use revprops
+ # FIXME: Raise exception when local is True
+ # FIXME: Use strct
assert timestamp is None
assert timezone is None
assert rev_id is None
else:
specific_files = [self.basedir.encode('utf8')]
- assert isinstance(message, basestring)
- self._message = message
+ if message_callback is not None:
+ def log_message_func(items, pool):
+ """ Simple log message provider for unit tests. """
+ return message_callback(self).encode("utf-8")
+ else:
+ assert isinstance(message, basestring)
+ def log_message_func(items, pool):
+ """ Simple log message provider for unit tests. """
+ return message.encode("utf-8")
- commit_info = svn.client.commit3(specific_files, True, False, self.client_ctx)
+ self.client_ctx.log_msg_baton2 = log_message_func
+ commit_info = svn.client.commit3(specific_files, True, False,
+ self.client_ctx)
+ self.client_ctx.log_msg_baton2 = None
- revid = self.branch.repository.generate_revision_id(commit_info.revision, self.branch.branch_path)
+ revid = self.branch.repository.generate_revision_id(
+ commit_info.revision, self.branch.branch_path)
self.base_revid = revid
+ self.base_revnum = commit_info.revision
+ self.base_tree = SvnBasisTree(self)
+
+ #FIXME: Use public API:
+ self.branch.revision_history()
self.branch._revision_history.append(revid)
return revid
- def add(self, files, ids=None):
+ def add(self, files, ids=None, kinds=None):
+ # FIXME: Use kinds
+ if isinstance(files, str):
+ files = [files]
+ if isinstance(ids, str):
+ ids = [ids]
+ if ids:
+ ids = copy(ids)
+ ids.reverse()
assert isinstance(files, list)
- wc = self._get_wc(write_lock=True)
- try:
- for f in files:
+ for f in files:
+ try:
+ wc = self._get_wc(os.path.dirname(f), write_lock=True)
try:
svn.wc.add2(os.path.join(self.basedir, f), wc, None, 0,
None, None, None)
if ids:
- svn.wc.prop_set2('bzr:fileid', ids.pop(), relpath, wc,
- False)
+ self._change_fileid_mapping(ids.pop(), f, wc)
except SubversionException, (_, num):
if num == svn.core.SVN_ERR_ENTRY_EXISTS:
continue
elif num == svn.core.SVN_ERR_WC_PATH_NOT_FOUND:
raise NoSuchFile(path=f)
raise
- finally:
- svn.wc.adm_close(wc)
+ finally:
+ svn.wc.adm_close(wc)
+ self.read_working_inventory()
def basis_tree(self):
if self.base_revid is None or self.base_revid == NULL_REVISION:
- return EmptyTree()
-
- return SvnBasisTree(self, self.base_revid)
-
- def pull(self, source, overwrite=False, stop_revision=None):
+ return self.branch.repository.revision_tree(self.base_revid)
+
+ return self.base_tree
+
+ def pull(self, source, overwrite=False, stop_revision=None,
+ delta_reporter=None):
+ # FIXME: Use delta_reporter
+ # FIXME: Use overwrite
+ result = PullResult()
+ result.source_branch = source
+ result.master_branch = None
+ result.target_branch = self.branch
+ (result.old_revno, result.old_revid) = self.branch.last_revision_info()
if stop_revision is None:
stop_revision = self.branch.last_revision()
rev = svn.core.svn_opt_revision_t()
rev.kind = svn.core.svn_opt_revision_number
- rev.value.number = self.branch.repository.parse_revision_id(stop_revision)[1]
+ rev.value.number = self.branch.lookup_revision_id(stop_revision)
fetched = svn.client.update(self.basedir, rev, True, self.client_ctx)
self.base_revid = self.branch.repository.generate_revision_id(fetched, self.branch.branch_path)
- return fetched-rev.value.number
+ result.new_revid = self.branch.generate_revision_id(fetched)
+ result.new_revno = self.branch.revision_id_to_revno(result.new_revid)
+ return result
- def get_file_sha1(self, file_id, path=None):
+ def get_file_sha1(self, file_id, path=None, stat_value=None):
if not path:
path = self._inventory.id2path(file_id)
-
return fingerprint_file(open(self.abspath(path)))['sha1']
+ def _change_fileid_mapping(self, id, path, wc=None):
+ if wc is None:
+ subwc = self._get_wc(write_lock=True)
+ else:
+ subwc = wc
+ new_entries = self._get_new_file_ids(subwc)
+ if id is None:
+ if new_entries.has_key(path):
+ del new_entries[path]
+ else:
+ assert isinstance(id, str)
+ new_entries[path] = id
+ committed = self.branch.repository.branchprop_list.get_property(
+ self.branch.branch_path,
+ self.base_revnum,
+ SVN_PROP_BZR_FILEIDS, "")
+ existing = committed + "".join(map(lambda (path, id): "%s\t%s\n" % (path, id), new_entries.items()))
+ if existing != "":
+ svn.wc.prop_set(SVN_PROP_BZR_FILEIDS, existing.encode("utf-8"), self.basedir, subwc)
+ if wc is None:
+ svn.wc.adm_close(subwc)
+
+ def _get_new_file_ids(self, wc):
+ committed = self.branch.repository.branchprop_list.get_property(
+ self.branch.branch_path, self.base_revnum,
+ SVN_PROP_BZR_FILEIDS, "")
+ existing = svn.wc.prop_get(SVN_PROP_BZR_FILEIDS, self.basedir, wc)
+ if existing is None:
+ return {}
+ else:
+ return dict(map(lambda x: str(x).split("\t"),
+ existing[len(committed):].splitlines()))
+
def _get_bzr_merges(self):
- return self.branch.repository._get_dir_prop(self.branch.branch_path,
- self.base_revnum,
- SVN_PROP_BZR_MERGE, "")
+ return self.branch.repository.branchprop_list.get_property(
+ self.branch.branch_path, self.base_revnum,
+ SVN_PROP_BZR_MERGE, "")
def _get_svk_merges(self):
- return self.branch.repository._get_dir_prop(self.branch.branch_path,
- self.base_revnum,
- SVN_PROP_SVK_MERGE, "")
+ return self.branch.repository.branchprop_list.get_property(
+ self.branch.branch_path, self.base_revnum,
+ SVN_PROP_SVK_MERGE, "")
def set_pending_merges(self, merges):
- # Set bzr:merge
- bzr_merge = self._get_bzr_merges()
- if len(merges) > 0:
- bzr_merge += "\t".join(merges) + "\n"
-
- # Set svk:merge
- svk_merge = self._get_svk_merges()
- if len(merges) > 0:
+ wc = self._get_wc(write_lock=True)
+ try:
+ # Set bzr:merge
+ if len(merges) > 0:
+ bzr_merge = "\t".join(merges) + "\n"
+ else:
+ bzr_merge = ""
+
+ svn.wc.prop_set(SVN_PROP_BZR_MERGE,
+ self._get_bzr_merges() + bzr_merge,
+ self.basedir, wc)
+
+ # Set svk:merge
+ svk_merge = ""
for merge in merges:
try:
svk_merge += revision_id_to_svk_feature(merge) + "\n"
except InvalidRevisionId:
pass
- wc = self._get_wc(write_lock=True)
- try:
- svn.wc.prop_set2(SVN_PROP_BZR_MERGE, bzr_merge, self.basedir, wc,
- False)
- svn.wc.prop_set2(SVN_PROP_SVK_MERGE, svk_merge, self.basedir, wc,
- False)
+ svn.wc.prop_set2(SVN_PROP_SVK_MERGE,
+ self._get_svk_merges() + svk_merge, self.basedir,
+ wc, False)
finally:
svn.wc.adm_close(wc)
def add_pending_merge(self, revid):
merges = self.pending_merges()
merges.append(revid)
- self.set_pending_merges(existing)
+ self.set_pending_merges(merges)
def pending_merges(self):
merged = self._get_bzr_merges().splitlines()
class SvnWorkingTreeFormat(WorkingTreeFormat):
+ """Subversion working copy format."""
def get_format_description(self):
return "Subversion Working Copy"
def initialize(self, a_bzrdir, revision_id=None):
- # FIXME
raise NotImplementedError(self.initialize)
def open(self, a_bzrdir):
- # FIXME
raise NotImplementedError(self.initialize)
class SvnCheckout(BzrDir):
+ """BzrDir implementation for Subversion checkouts (directories
+ containing a .svn subdirectory."""
def __init__(self, transport, format):
super(SvnCheckout, self).__init__(transport, format)
self.local_path = transport.local_abspath(".")
finally:
svn.wc.adm_close(wc)
- bzr_url = svn_to_bzr_url(svn_url)
-
self.remote_transport = SvnRaTransport(svn_url)
- self.svn_root_transport = self.remote_transport.get_root()
+ self.svn_root_transport = SvnRaTransport(self.remote_transport.get_repos_root())
self.root_transport = self.transport = transport
- self.branch_path = svn_url[len(svn_to_bzr_url(self.svn_root_transport.base)):]
+
+ self.branch_path = svn_url[len(bzr_to_svn_url(self.svn_root_transport.base)):]
self.scheme = BranchingScheme.guess_scheme(self.branch_path)
mutter('scheme for %r is %r' % (self.branch_path, self.scheme))
- if not self.scheme.is_branch(self.branch_path):
+ if not self.scheme.is_branch(self.branch_path) and not self.scheme.is_tag(self.branch_path):
raise NotBranchError(path=self.transport.base)
- def clone(self, path):
+ def clone(self, path, revision_id=None, force_new_repo=False):
raise NotImplementedError(self.clone)
- def open_workingtree(self, _unsupported=False):
+ def open_workingtree(self, _unsupported=False, recommend_upgrade=False):
return SvnWorkingTree(self, self.local_path, self.open_branch())
- def sprout(self, url, revision_id=None, basis=None, force_new_repo=False):
+ def sprout(self, url, revision_id=None, force_new_repo=False,
+ recurse='down'):
# FIXME: honor force_new_repo
+ # FIXME: Use recurse
result = BzrDirFormat.get_default_format().initialize(url)
- repo = self.open_repository()
- result_repo = repo.clone(result, revision_id, basis)
+ repo = self.find_repository()
+ repo.clone(result, revision_id)
branch = self.open_branch()
branch.sprout(result, revision_id)
result.create_workingtree()
return result
def open_repository(self):
- repos = SvnRepository(self, self.svn_root_transport)
- repos._format = self._format
- return repos
+ raise NoRepositoryPresent(self)
- # Subversion has all-in-one, so a repository is always present
- find_repository = open_repository
+ def find_repository(self):
+ return SvnRepository(self, self.svn_root_transport)
def create_workingtree(self, revision_id=None):
+ """See BzrDir.create_workingtree().
+
+ Not implemented for Subversion because having a .svn directory
+ implies having a working copy.
+ """
raise NotImplementedError(self.create_workingtree)
def create_branch(self):
def open_branch(self, unsupported=True):
"""See BzrDir.open_branch()."""
- repos = self.open_repository()
+ repos = self.find_repository()
try:
branch = SvnBranch(self.root_transport.base, repos, self.branch_path)
- except SubversionException, (msg, num):
+ except SubversionException, (_, num):
if num == svn.core.SVN_ERR_WC_NOT_DIRECTORY:
- raise NotBranchError(path=self.url)
+ raise NotBranchError(path=self.base)
raise
branch.bzrdir = self
class SvnWorkingTreeDirFormat(BzrDirFormat):
+ """Working Tree implementation that uses Subversion working copies."""
_lock_class = TransportLock
@classmethod
def probe_transport(klass, transport):
format = klass()
- if transport.has(svn.wc.get_adm_dir()):
+ if isinstance(transport, LocalTransport) and \
+ transport.has(svn.wc.get_adm_dir()):
+ subr_version = svn.core.svn_subr_version()
+ if subr_version.major == 1 and subr_version.minor < 4:
+ raise NoCheckoutSupport()
return format
raise NotBranchError(path=transport.base)
def initialize_on_transport(self, transport):
raise NotImplementedError(self.initialize_on_transport)
+
+ def get_converter(self, format=None):
+ """See BzrDirFormat.get_converter()."""
+ if format is None:
+ format = BzrDirFormat.get_default_format()
+ return SvnConverter(format)