# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
+# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""Checkouts and working trees (working copies)."""
-import bzrlib
-from bzrlib import urlutils
+import bzrlib, bzrlib.add
+from bzrlib import osutils, urlutils
from bzrlib.branch import PullResult
from bzrlib.bzrdir import BzrDirFormat, BzrDir
from bzrlib.errors import (InvalidRevisionId, NotBranchError, NoSuchFile,
from bzrlib.inventory import Inventory, InventoryFile, InventoryLink
from bzrlib.lockable_files import TransportLock, LockableFiles
from bzrlib.lockdir import LockDir
-from bzrlib.osutils import file_kind, fingerprint_file, supports_executable
from bzrlib.revision import NULL_REVISION
from bzrlib.trace import mutter
-from bzrlib.tree import RevisionTree
+from bzrlib.revisiontree import RevisionTree
from bzrlib.transport.local import LocalTransport
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat
-from branch import SvnBranch
-from convert import SvnConverter
-from errors import LocalCommitsUnsupported, NoSvnRepositoryPresent
-from mapping import (SVN_PROP_BZR_ANCESTRY, SVN_PROP_BZR_FILEIDS,
+from bzrlib.plugins.svn import core, properties
+from bzrlib.plugins.svn.auth import create_auth_baton
+from bzrlib.plugins.svn.branch import SvnBranch
+from bzrlib.plugins.svn.client import Client
+from bzrlib.plugins.svn.commit import _revision_id_to_svk_feature
+from bzrlib.plugins.svn.convert import SvnConverter
+from bzrlib.plugins.svn.core import SubversionException
+from bzrlib.plugins.svn.errors import LocalCommitsUnsupported, NoSvnRepositoryPresent, ERR_FS_TXN_OUT_OF_DATE, ERR_ENTRY_EXISTS, ERR_WC_PATH_NOT_FOUND, ERR_WC_NOT_DIRECTORY
+from bzrlib.plugins.svn.format import get_rich_root_format
+from bzrlib.plugins.svn.mapping import (SVN_PROP_BZR_ANCESTRY, SVN_PROP_BZR_FILEIDS,
SVN_PROP_BZR_REVISION_ID, SVN_PROP_BZR_REVISION_INFO,
- generate_revision_metadata)
-from remote import SvnRemoteAccess
-from repository import SvnRepository
-from svk import SVN_PROP_SVK_MERGE, parse_svk_features
-from mapping import escape_svn_path
-from scheme import BranchingScheme
-from transport import (SvnRaTransport, bzr_to_svn_url, create_svn_client,
+ escape_svn_path, generate_revision_metadata)
+from bzrlib.plugins.svn.remote import SvnRemoteAccess
+from bzrlib.plugins.svn.repository import SvnRepository
+from bzrlib.plugins.svn.svk import SVN_PROP_SVK_MERGE, parse_svk_features, serialize_svk_features
+from bzrlib.plugins.svn.transport import (SvnRaTransport, bzr_to_svn_url,
svn_config)
-from tree import SvnBasisTree
+from bzrlib.plugins.svn.tree import SvnBasisTree
+from bzrlib.plugins.svn.wc import *
import os
import urllib
-import svn.core, svn.wc
-from svn.core import SubversionException, Pool
-
-from errors import NoCheckoutSupport
-from format import get_rich_root_format
-
-class WorkingTreeInconsistent(BzrError):
- _fmt = """Working copy is in inconsistent state (%(min_revnum)d:%(max_revnum)d)"""
-
- def __init__(self, min_revnum, max_revnum):
- self.min_revnum = min_revnum
- self.max_revnum = max_revnum
+def generate_ignore_list(ignore_map):
+ """Create a list of ignores, ordered by directory.
+
+ :param ignore_map: Dictionary with paths as keys, patterns as values.
+ :return: list of ignores
+ """
+ ignores = []
+ keys = ignore_map.keys()
+ keys.sort()
+ for k in keys:
+ ignores.append("./" + os.path.join(k.strip("/"), ignore_map[k].strip("/")))
+ return ignores
class SvnWorkingTree(WorkingTree):
"""WorkingTree implementation that uses a Subversion Working Copy for storage."""
def __init__(self, bzrdir, local_path, branch):
- self._format = SvnWorkingTreeFormat()
+ version = check_wc(local_path)
+ self._format = SvnWorkingTreeFormat(version)
self.basedir = local_path
+ assert isinstance(self.basedir, unicode)
self.bzrdir = bzrdir
self._branch = branch
self.base_revnum = 0
- self.pool = Pool()
- self.client_ctx = create_svn_client(self.pool)
- self.client_ctx.log_msg_func2 = \
- svn.client.svn_swig_py_get_commit_log_func
+ self.client_ctx = Client(auth=create_auth_baton(bzrdir.svn_url))
self._get_wc()
- status = svn.wc.revision_status(self.basedir, None, True, None, None)
- if status.min_rev != status.max_rev:
- #raise WorkingTreeInconsistent(status.min_rev, status.max_rev)
- rev = svn.core.svn_opt_revision_t()
- rev.kind = svn.core.svn_opt_revision_number
- rev.value.number = status.max_rev
- assert status.max_rev == svn.client.update(self.basedir, rev,
- True, self.client_ctx, Pool())
-
- self.base_revnum = status.max_rev
+ max_rev = revision_status(self.basedir, None, True)[1]
+ self.base_revnum = max_rev
self.base_tree = SvnBasisTree(self)
self.base_revid = branch.generate_revision_id(self.base_revnum)
self.read_working_inventory()
- self.controldir = os.path.join(self.basedir, svn.wc.get_adm_dir(),
+ self.controldir = os.path.join(self.basedir, get_adm_dir(),
'bzr')
try:
os.makedirs(self.controldir)
os.makedirs(os.path.join(self.controldir, 'lock'))
except OSError:
pass
- control_transport = bzrdir.transport.clone(os.path.join(
- svn.wc.get_adm_dir(), 'bzr'))
+ control_transport = bzrdir.transport.clone(urlutils.join(
+ get_adm_dir(), 'bzr'))
self._control_files = LockableFiles(control_transport, 'lock', LockDir)
def get_ignore_list(self):
- ignores = set([svn.wc.get_adm_dir()])
- ignores.update(svn.wc.get_default_ignores(svn_config))
+ ignores = set([get_adm_dir()])
+ ignores.update(svn_config.get_default_ignores())
def dir_add(wc, prefix, patprefix):
- ignorestr = svn.wc.prop_get(svn.core.SVN_PROP_IGNORE,
- self.abspath(prefix).rstrip("/"), wc)
+ ignorestr = wc.prop_get(properties.PROP_IGNORE,
+ self.abspath(prefix).rstrip("/"))
if ignorestr is not None:
for pat in ignorestr.splitlines():
ignores.add(urlutils.joinpath(patprefix, pat))
- entries = svn.wc.entries_read(wc, False)
+ entries = wc.entries_read(False)
for entry in entries:
if entry == "":
continue
- if entries[entry].kind != svn.core.svn_node_dir:
+ # Ignore ignores on things that aren't directories
+ if entries[entry].kind != core.NODE_DIR:
continue
subprefix = os.path.join(prefix, entry)
- subwc = svn.wc.adm_open3(wc, self.abspath(subprefix), False,
- 0, None)
+ subwc = WorkingCopy(wc, self.abspath(subprefix))
try:
dir_add(subwc, subprefix, urlutils.joinpath(patprefix, entry))
finally:
- svn.wc.adm_close(subwc)
+ subwc.close()
wc = self._get_wc()
try:
dir_add(wc, "", ".")
finally:
- svn.wc.adm_close(wc)
+ wc.close()
return ignores
def is_control_filename(self, path):
- return svn.wc.is_adm_dir(path)
+ return is_adm_dir(path)
def apply_inventory_delta(self, changes):
raise NotImplementedError(self.apply_inventory_delta)
def update(self, change_reporter=None):
- rev = svn.core.svn_opt_revision_t()
- rev.kind = svn.core.svn_opt_revision_head
- svn.client.update(self.basedir, rev, True, self.client_ctx)
+ self.client_ctx.update([self.basedir.encode("utf-8")], "HEAD", True)
def remove(self, files, verbose=False, to_file=None):
# FIXME: Use to_file argument
wc = self._get_wc(write_lock=True)
try:
for file in files:
- svn.wc.delete2(self.abspath(file), wc, None, None, None)
+ wc.delete(self.abspath(file))
finally:
- svn.wc.adm_close(wc)
+ wc.close()
for file in files:
self._change_fileid_mapping(None, file)
self.read_working_inventory()
def _get_wc(self, relpath="", write_lock=False):
- return svn.wc.adm_open3(None, self.abspath(relpath).rstrip("/"),
- write_lock, 0, None)
+ return WorkingCopy(None, self.abspath(relpath).rstrip("/"),
+ write_lock)
def _get_rel_wc(self, relpath, write_lock=False):
dir = os.path.dirname(relpath)
def move(self, from_paths, to_dir=None, after=False, **kwargs):
# FIXME: Use after argument
assert after != True
- revt = svn.core.svn_opt_revision_t()
- revt.kind = svn.core.svn_opt_revision_working
for entry in from_paths:
try:
to_wc = self._get_wc(to_dir, write_lock=True)
- svn.wc.copy(self.abspath(entry), to_wc,
- os.path.basename(entry), None, None)
+ to_wc.copy(self.abspath(entry), os.path.basename(entry))
finally:
- svn.wc.adm_close(to_wc)
+ to_wc.close()
try:
from_wc = self._get_wc(write_lock=True)
- svn.wc.delete2(self.abspath(entry), from_wc, None, None, None)
+ from_wc.delete(self.abspath(entry))
finally:
- svn.wc.adm_close(from_wc)
+ from_wc.close()
new_name = urlutils.join(to_dir, os.path.basename(entry))
self._change_fileid_mapping(self.inventory.path2id(entry), new_name)
self._change_fileid_mapping(None, entry)
def rename_one(self, from_rel, to_rel, after=False):
# FIXME: Use after
assert after != True
- revt = svn.core.svn_opt_revision_t()
- revt.kind = svn.core.svn_opt_revision_unspecified
(to_wc, to_file) = self._get_rel_wc(to_rel, write_lock=True)
if os.path.dirname(from_rel) == os.path.dirname(to_rel):
# Prevent lock contention
(from_wc, _) = self._get_rel_wc(from_rel, write_lock=True)
from_id = self.inventory.path2id(from_rel)
try:
- svn.wc.copy(self.abspath(from_rel), to_wc, to_file, None, None)
- svn.wc.delete2(self.abspath(from_rel), from_wc, None, None, None)
+ to_wc.copy(self.abspath(from_rel), to_file)
+ from_wc.delete(self.abspath(from_rel))
finally:
- svn.wc.adm_close(to_wc)
+ to_wc.close()
self._change_fileid_mapping(None, from_rel)
self._change_fileid_mapping(from_id, to_rel)
self.read_working_inventory()
assert isinstance(path, str)
rp = self.branch.unprefix(path)
- entry = self.base_tree.id_map[rp]
+ entry = self.basis_tree().id_map[rp]
assert entry[0] is not None
assert isinstance(entry[0], str), "fileid %r for %r is not a string" % (entry[0], path)
return entry
file = InventoryFile(id, os.path.basename(relpath), parent_id)
file.revision = revid
try:
- data = fingerprint_file(open(self.abspath(relpath)))
+ data = osutils.fingerprint_file(open(self.abspath(relpath)))
file.text_sha1 = data['sha1']
file.text_size = data['size']
file.executable = self.is_executable(id, relpath)
def find_copies(url, relpath=""):
wc = self._get_wc(relpath)
- entries = svn.wc.entries_read(wc, False)
+ entries = wc.entries_read(False)
for entry in entries.values():
subrelpath = os.path.join(relpath, entry.name)
if entry.name == "" or entry.kind != 'directory':
if ((entry.copyfrom_url == url or entry.url == url) and
- not (entry.schedule in (svn.wc.schedule_delete,
- svn.wc.schedule_replace))):
+ not (entry.schedule in (SCHEDULE_DELETE,
+ SCHEDULE_REPLACE))):
yield os.path.join(
self.branch.get_branch_path().strip("/"),
subrelpath)
else:
find_copies(subrelpath)
- svn.wc.adm_close(wc)
+ wc.close()
def find_ids(entry, rootwc):
relpath = urllib.unquote(entry.url[len(entry.repos):].strip("/"))
- assert entry.schedule in (svn.wc.schedule_normal,
- svn.wc.schedule_delete,
- svn.wc.schedule_add,
- svn.wc.schedule_replace)
- if entry.schedule == svn.wc.schedule_normal:
+ assert entry.schedule in (SCHEDULE_NORMAL,
+ SCHEDULE_DELETE,
+ SCHEDULE_ADD,
+ SCHEDULE_REPLACE)
+ if entry.schedule == SCHEDULE_NORMAL:
assert entry.revision >= 0
# Keep old id
return self.path_to_file_id(entry.cmt_rev, entry.revision,
relpath)
- elif entry.schedule == svn.wc.schedule_delete:
+ elif entry.schedule == SCHEDULE_DELETE:
return (None, None)
- elif (entry.schedule == svn.wc.schedule_add or
- entry.schedule == svn.wc.schedule_replace):
+ elif (entry.schedule == SCHEDULE_ADD or
+ entry.schedule == SCHEDULE_REPLACE):
# See if the file this file was copied from disappeared
# and has no other copies -> in that case, take id of other file
if (entry.copyfrom_url and
def add_dir_to_inv(relpath, wc, parent_id):
assert isinstance(relpath, unicode)
- entries = svn.wc.entries_read(wc, False)
+ entries = wc.entries_read(False)
entry = entries[""]
assert parent_id is None or isinstance(parent_id, str), \
"%r is not a string" % parent_id
(id, revid) = find_ids(entry, rootwc)
if id is None:
- mutter('no id for %r' % entry.url)
+ mutter('no id for %r', entry.url)
return
assert revid is None or isinstance(revid, str), "%r is not a string" % revid
assert isinstance(id, str), "%r is not a string" % id
entry = entries[name]
assert entry
- if entry.kind == svn.core.svn_node_dir:
- subwc = svn.wc.adm_open3(wc, self.abspath(subrelpath),
- False, 0, None)
+ if entry.kind == core.NODE_DIR:
+ subwc = WorkingCopy(wc, self.abspath(subrelpath))
try:
add_dir_to_inv(subrelpath, subwc, id)
finally:
- svn.wc.adm_close(subwc)
+ subwc.close()
else:
(subid, subrevid) = find_ids(entry, rootwc)
if subid:
add_file_to_inv(subrelpath, subid, subrevid, id)
else:
- mutter('no id for %r' % entry.url)
+ mutter('no id for %r', entry.url)
rootwc = self._get_wc()
try:
add_dir_to_inv(u"", rootwc, None)
finally:
- svn.wc.adm_close(rootwc)
+ rootwc.close()
self._set_inventory(inv, dirty=False)
return inv
def set_last_revision(self, revid):
- mutter('setting last revision to %r' % revid)
+ mutter('setting last revision to %r', revid)
if revid is None or revid == NULL_REVISION:
self.base_revid = revid
self.base_revnum = 0
def update_settings(wc, path):
id = newrevtree.inventory.path2id(path)
- mutter("Updating settings for %r" % id)
+ mutter("Updating settings for %r", id)
revnum = self.branch.lookup_revision_id(
newrevtree.inventory[id].revision)
- svn.wc.process_committed2(self.abspath(path).rstrip("/"), wc,
+ wc.process_committed(self.abspath(path).rstrip("/"),
False, revnum,
- svn.core.svn_time_to_cstring(newrev.timestamp),
- newrev.committer, None, False)
+ properties.time_to_cstring(newrev.timestamp),
+ newrev.committer)
if newrevtree.inventory[id].kind != 'directory':
return
- entries = svn.wc.entries_read(wc, True)
+ entries = wc.entries_read(True)
for entry in entries:
if entry == "":
continue
- subwc = svn.wc.adm_open3(wc, os.path.join(self.basedir, path, entry), False, 0, None)
+ subwc = WorkingCopy(wc, os.path.join(self.basedir, path, entry))
try:
update_settings(subwc, os.path.join(path, entry))
finally:
- svn.wc.adm_close(subwc)
+ subwc.close()
# Set proper version for all files in the wc
wc = self._get_wc(write_lock=True)
try:
update_settings(wc, "")
finally:
- svn.wc.adm_close(wc)
+ wc.close()
self.base_revid = revid
def commit(self, message=None, message_callback=None, revprops=None,
specific_files = [self.basedir.encode('utf8')]
if message_callback is not None:
- def log_message_func(items, pool):
+ def log_message_func(items):
""" Simple log message provider for unit tests. """
return message_callback(self).encode("utf-8")
else:
assert isinstance(message, basestring)
- def log_message_func(items, pool):
+ def log_message_func(items):
""" Simple log message provider for unit tests. """
return message.encode("utf-8")
- self.client_ctx.log_msg_baton2 = log_message_func
+ self.client_ctx.log_msg_func = log_message_func
if rev_id is not None:
extra = "%d %s\n" % (self.branch.revno()+1, rev_id)
else:
extra = ""
wc = self._get_wc(write_lock=True)
try:
- svn.wc.prop_set(SVN_PROP_BZR_REVISION_ID+str(self.branch.mapping.scheme),
- self._get_bzr_revids() + extra,
- self.basedir, wc)
- svn.wc.prop_set(SVN_PROP_BZR_REVISION_INFO,
+ wc.prop_set(SVN_PROP_BZR_REVISION_ID+str(self.branch.mapping.scheme),
+ self._get_bzr_revids(self._get_base_branch_props()) + extra,
+ self.basedir)
+ wc.prop_set(SVN_PROP_BZR_REVISION_INFO,
generate_revision_metadata(timestamp,
timezone,
committer,
revprops),
- self.basedir, wc)
+ self.basedir)
finally:
- svn.wc.adm_close(wc)
+ wc.close()
try:
try:
- commit_info = svn.client.commit3(specific_files, True, False,
- self.client_ctx)
+ (revision, _, _) = self.client_ctx.commit(specific_files, True, False)
except SubversionException, (_, num):
- if num == svn.core.SVN_ERR_FS_TXN_OUT_OF_DATE:
+ if num == ERR_FS_TXN_OUT_OF_DATE:
raise OutOfDateTree(self)
raise
except:
# Reset properties so the next subversion commit won't
# accidently set these properties.
wc = self._get_wc(write_lock=True)
- svn.wc.prop_set(SVN_PROP_BZR_REVISION_ID+str(self.branch.mapping.scheme),
- self._get_bzr_revids(), self.basedir, wc)
- svn.wc.prop_set(SVN_PROP_BZR_REVISION_INFO,
- self.branch.repository.branchprop_list.get_property(
- self.branch.get_branch_path(self.base_revnum),
- self.base_revnum,
- SVN_PROP_BZR_REVISION_INFO, ""),
- self.basedir, wc)
- svn.wc.adm_close(wc)
+ base_branch_props = self._get_base_branch_props()
+ wc.prop_set(SVN_PROP_BZR_REVISION_ID+str(self.branch.mapping.scheme),
+ self._get_bzr_revids(base_branch_props), self.basedir)
+ wc.prop_set(SVN_PROP_BZR_REVISION_INFO,
+ base_branch_props.get(SVN_PROP_BZR_REVISION_INFO, ""),
+ self.basedir)
+ wc.close()
raise
- self.client_ctx.log_msg_baton2 = None
+ self.client_ctx.log_msg_func = None
- revid = self.branch.generate_revision_id(commit_info.revision)
+ revid = self.branch.generate_revision_id(revision)
self.base_revid = revid
- self.base_revnum = commit_info.revision
+ self.base_revnum = revision
self.base_tree = SvnBasisTree(self)
return revid
try:
if not self.inventory.has_filename(f):
if save:
- mutter('adding %r' % file_path)
- svn.wc.add2(file_path, wc, None, 0, None, None, None)
+ mutter('adding %r', file_path)
+ wc.add(file_path)
added.append(file_path)
- if recurse and file_kind(file_path) == 'directory':
+ if recurse and osutils.file_kind(file_path) == 'directory':
# Filter out ignored files and update ignored
for c in os.listdir(file_path):
if self.is_control_filename(c):
ignored.setdefault(ignore_glob, []).append(c_path)
todo.append(c_path)
finally:
- svn.wc.adm_close(wc)
+ wc.close()
if todo != []:
cadded, cignored = self.smart_add(todo, recurse, action, save)
added.extend(cadded)
wc = self._get_wc(os.path.dirname(f), write_lock=True)
try:
try:
- svn.wc.add2(os.path.join(self.basedir, f), wc, None, 0,
- None, None, None)
+ wc.add(os.path.join(self.basedir, f))
if ids is not None:
self._change_fileid_mapping(ids.next(), f, wc)
except SubversionException, (_, num):
- if num == svn.core.SVN_ERR_ENTRY_EXISTS:
+ if num == ERR_ENTRY_EXISTS:
continue
- elif num == svn.core.SVN_ERR_WC_PATH_NOT_FOUND:
+ elif num == ERR_WC_PATH_NOT_FOUND:
raise NoSuchFile(path=f)
raise
finally:
- svn.wc.adm_close(wc)
+ wc.close()
self.read_working_inventory()
def basis_tree(self):
(result.old_revno, result.old_revid) = self.branch.last_revision_info()
if stop_revision is None:
stop_revision = self.branch.last_revision()
- rev = svn.core.svn_opt_revision_t()
- rev.kind = svn.core.svn_opt_revision_number
- rev.value.number = self.branch.lookup_revision_id(stop_revision)
- fetched = svn.client.update(self.basedir, rev, True, self.client_ctx)
+ rev = self.branch.lookup_revision_id(stop_revision)
+ fetched = self.client_ctx.update(self.basedir, rev, True)
self.base_revid = self.branch.generate_revision_id(fetched)
result.new_revid = self.base_revid
result.new_revno = self.branch.revision_id_to_revno(result.new_revid)
def get_file_sha1(self, file_id, path=None, stat_value=None):
if not path:
path = self._inventory.id2path(file_id)
- return fingerprint_file(open(self.abspath(path)))['sha1']
+ return osutils.fingerprint_file(open(self.abspath(path)))['sha1']
def _change_fileid_mapping(self, id, path, wc=None):
if wc is None:
new_entries[path] = id
existing = "".join(map(lambda (path, id): "%s\t%s\n" % (path, id), new_entries.items()))
if existing != "":
- svn.wc.prop_set(SVN_PROP_BZR_FILEIDS, existing.encode("utf-8"), self.basedir, subwc)
+ subwc.prop_set(SVN_PROP_BZR_FILEIDS, existing.encode("utf-8"), self.basedir)
if wc is None:
- svn.wc.adm_close(subwc)
+ subwc.close()
+
+ def _get_base_branch_props(self):
+ return self.branch.repository.branchprop_list.get_properties(
+ self.branch.get_branch_path(self.base_revnum), self.base_revnum)
def _get_new_file_ids(self, wc):
- committed = self.branch.repository.branchprop_list.get_property(
- self.branch.get_branch_path(self.base_revnum), self.base_revnum,
- SVN_PROP_BZR_FILEIDS, "")
- existing = svn.wc.prop_get(SVN_PROP_BZR_FILEIDS, self.basedir, wc)
+ committed = self._get_base_branch_props().get(SVN_PROP_BZR_FILEIDS, "")
+ existing = wc.prop_get(SVN_PROP_BZR_FILEIDS, self.basedir)
if existing is None or committed == existing:
return {}
return dict(map(lambda x: str(x).split("\t"),
existing.splitlines()))
- def _get_bzr_revids(self):
- return self.branch.repository.branchprop_list.get_property(
- self.branch.get_branch_path(self.base_revnum),
- self.base_revnum,
- SVN_PROP_BZR_REVISION_ID+str(self.branch.mapping.scheme), "")
+ def _get_bzr_revids(self, base_branch_props):
+ return base_branch_props.get(SVN_PROP_BZR_REVISION_ID+str(self.branch.mapping.scheme), "")
- def _get_bzr_merges(self):
- return self.branch.repository.branchprop_list.get_property(
- self.branch.get_branch_path(self.base_revnum),
- self.base_revnum, SVN_PROP_BZR_ANCESTRY+str(self.branch.mapping.scheme), "")
+ def _get_bzr_merges(self, base_branch_props):
+ return base_branch_props.get(SVN_PROP_BZR_ANCESTRY+str(self.branch.mapping.scheme), "")
- def _get_svk_merges(self):
- return self.branch.repository.branchprop_list.get_property(
- self.branch.get_branch_path(self.base_revnum),
- self.base_revnum, SVN_PROP_SVK_MERGE, "")
+ def _get_svk_merges(self, base_branch_props):
+ return base_branch_props.get(SVN_PROP_SVK_MERGE, "")
def set_pending_merges(self, merges):
"""See MutableTree.set_pending_merges()."""
else:
bzr_merge = ""
- svn.wc.prop_set(SVN_PROP_BZR_ANCESTRY+str(self.branch.mapping.scheme),
- self._get_bzr_merges() + bzr_merge,
- self.basedir, wc)
+ wc.prop_set(SVN_PROP_BZR_ANCESTRY+str(self.branch.mapping.scheme),
+ self._get_bzr_merges(self._get_base_branch_props()) + bzr_merge,
+ self.basedir)
- svk_merges = parse_svk_features(self._get_svk_merges())
+ svk_merges = parse_svk_features(self._get_svk_merges(self._get_base_branch_props()))
# Set svk:merge
for merge in merges:
try:
- svk_merges.add(self.branch.mapping.revision_id_to_svk_feature(merge))
+ svk_merges.add(_revision_id_to_svk_feature(merge))
except InvalidRevisionId:
pass
- svn.wc.prop_set2(SVN_PROP_SVK_MERGE,
- serialize_svk_features(svk_merges), self.basedir,
- wc, False)
+ wc.prop_set(SVN_PROP_SVK_MERGE,
+ serialize_svk_features(svk_merges), self.basedir)
finally:
- svn.wc.adm_close(wc)
+ wc.close()
def add_pending_merge(self, revid):
merges = self.pending_merges()
merges.append(revid)
self.set_pending_merges(merges)
+ def get_parent_ids(self):
+ return [self.base_revid] + self.pending_merges()
+
def pending_merges(self):
- merged = self._get_bzr_merges().splitlines()
+ merged = self._get_bzr_merges(self._get_base_branch_props()).splitlines()
wc = self._get_wc()
try:
- merged_data = svn.wc.prop_get(
- SVN_PROP_BZR_ANCESTRY+str(self.branch.mapping.scheme), self.basedir, wc)
+ merged_data = wc.prop_get(
+ SVN_PROP_BZR_ANCESTRY+str(self.branch.mapping.scheme), self.basedir)
if merged_data is None:
set_merged = []
else:
set_merged = merged_data.splitlines()
finally:
- svn.wc.adm_close(wc)
+ wc.close()
assert (len(merged) == len(set_merged) or
len(merged)+1 == len(set_merged))
return []
+ def path_content_summary(self, path, _lstat=os.lstat,
+ _mapper=osutils.file_kind_from_stat_mode):
+ """See Tree.path_content_summary."""
+ abspath = self.abspath(path)
+ try:
+ stat_result = _lstat(abspath)
+ except OSError, e:
+ if getattr(e, 'errno', None) == errno.ENOENT:
+ # no file.
+ return ('missing', None, None, None)
+ # propagate other errors
+ raise
+ kind = _mapper(stat_result.st_mode)
+ if kind == 'file':
+ size = stat_result.st_size
+ # try for a stat cache lookup
+ executable = self._is_executable_from_path_and_stat(path, stat_result)
+ return (kind, size, executable, self._sha_from_stat(
+ path, stat_result))
+ elif kind == 'directory':
+ return kind, None, None, None
+ elif kind == 'symlink':
+ return ('symlink', None, None, os.readlink(abspath))
+ else:
+ return (kind, None, None, None)
+
def _reset_data(self):
pass
finally:
self.branch.unlock()
- if not supports_executable():
- def _is_executable_from_path_and_stat_from_basis(self, path, stat_result):
- file_id = self.basis_tree().path2(path)
- return self.basis_tree()._inventory[file_id].executable
-
+ if not osutils.supports_executable():
def is_executable(self, file_id, path=None):
- return self.basis_tree()._inventory[file_id].executable
+ inv = self.basis_tree()._inventory
+ if file_id in inv:
+ return inv[file_id].executable
+ # Default to not executable
+ return False
class SvnWorkingTreeFormat(WorkingTreeFormat):
"""Subversion working copy format."""
+ def __init__(self, version):
+ self.version = version
+
def __get_matchingbzrdir(self):
return SvnWorkingTreeDirFormat()
_matchingbzrdir = property(__get_matchingbzrdir)
def get_format_description(self):
- return "Subversion Working Copy"
+ return "Subversion Working Copy Version %d" % self.version
def get_format_string(self):
- return "Subversion Working Copy Format"
+ raise NotImplementedError
def initialize(self, a_bzrdir, revision_id=None):
raise NotImplementedError(self.initialize)
self.local_path = transport.local_abspath(".")
# Open related remote repository + branch
- wc = svn.wc.adm_open3(None, self.local_path, False, 0, None)
try:
- svn_url = svn.wc.entry(self.local_path, wc, True).url
+ wc = WorkingCopy(None, self.local_path)
+ except SubversionException, (msg, ERR_WC_UNSUPPORTED_FORMAT):
+ raise UnsupportedFormatError(msg, kind='workingtree')
+ try:
+ self.svn_url = wc.entry(self.local_path, True).url
finally:
- svn.wc.adm_close(wc)
+ wc.close()
- remote_transport = SvnRaTransport(svn_url)
- self.remote_bzrdir = SvnRemoteAccess(remote_transport)
- self.svn_root_transport = remote_transport.clone_root()
+ self.remote_transport = SvnRaTransport(self.svn_url)
+ self.remote_bzrdir = SvnRemoteAccess(self.remote_transport)
+ self.svn_root_transport = self.remote_transport.clone_root()
self.root_transport = self.transport = transport
def clone(self, path, revision_id=None, force_new_repo=False):
return SvnWorkingTree(self, self.local_path, self.open_branch())
def sprout(self, url, revision_id=None, force_new_repo=False,
- recurse='down', possible_transports=None, accelerator_tree=None):
+ recurse='down', possible_transports=None, accelerator_tree=None,
+ hardlink=False):
# FIXME: honor force_new_repo
# FIXME: Use recurse
result = get_rich_root_format().initialize(url)
- repo = self.find_repository()
+ repo = self._find_repository()
repo.clone(result, revision_id)
branch = self.open_branch()
branch.sprout(result, revision_id)
- result.create_workingtree()
+ result.create_workingtree(hardlink=hardlink)
return result
def open_repository(self):
raise NoRepositoryPresent(self)
def find_repository(self):
+ raise NoRepositoryPresent(self)
+
+ def _find_repository(self):
return SvnRepository(self, self.svn_root_transport,
self.remote_bzrdir.branch_path)
format = BzrDirFormat.get_default_format()
return not isinstance(self._format, format.__class__)
- def create_workingtree(self, revision_id=None):
+ def create_workingtree(self, revision_id=None, hardlink=None):
"""See BzrDir.create_workingtree().
Not implemented for Subversion because having a .svn directory
def open_branch(self, unsupported=True):
"""See BzrDir.open_branch()."""
- repos = self.find_repository()
+ repos = self._find_repository()
try:
- branch = SvnBranch(self.svn_root_transport.base, repos,
+ branch = SvnBranch(self.remote_transport.base, repos,
self.remote_bzrdir.branch_path)
except SubversionException, (_, num):
- if num == svn.core.SVN_ERR_WC_NOT_DIRECTORY:
+ if num == ERR_WC_NOT_DIRECTORY:
raise NotBranchError(path=self.base)
raise
branch.bzrdir = self.remote_bzrdir
return branch
-
-
-