# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""Checkouts and working trees (working copies)."""
-from binascii import hexlify
from bzrlib.branch import PullResult
from bzrlib.bzrdir import BzrDirFormat, BzrDir
from bzrlib.errors import (InvalidRevisionId, NotBranchError, NoSuchFile,
NoRepositoryPresent, BzrError)
-from bzrlib.inventory import (Inventory, InventoryDirectory, InventoryFile,
- InventoryLink, ROOT_ID)
+from bzrlib.inventory import Inventory, InventoryFile, InventoryLink
from bzrlib.lockable_files import TransportLock, LockableFiles
from bzrlib.lockdir import LockDir
-from bzrlib.osutils import rand_bytes, fingerprint_file
-from bzrlib.progress import DummyProgress
+from bzrlib.osutils import fingerprint_file
from bzrlib.revision import NULL_REVISION
from bzrlib.trace import mutter
+from bzrlib.tree import RevisionTree
from bzrlib.transport.local import LocalTransport
from bzrlib.workingtree import WorkingTree, WorkingTreeFormat
from branch import SvnBranch
-from repository import (SvnRepository, escape_svn_path, SVN_PROP_BZR_MERGE,
+from convert import SvnConverter
+from repository import (SvnRepository, SVN_PROP_BZR_MERGE,
SVN_PROP_SVK_MERGE, SVN_PROP_BZR_FILEIDS,
revision_id_to_svk_feature)
+from revids import escape_svn_path
from scheme import BranchingScheme
from transport import (SvnRaTransport, svn_config, bzr_to_svn_url,
_create_auth_baton)
import svn.core, svn.wc
from svn.core import SubversionException, Pool
+from errors import NoCheckoutSupport
+
class WorkingTreeInconsistent(BzrError):
_fmt = """Working copy is in inconsistent state (%(min_revnum)d:%(max_revnum)d)"""
class SvnWorkingTree(WorkingTree):
- """Implementation of WorkingTree that uses a Subversion
- Working Copy for storage."""
+ """WorkingTree implementation that uses a Subversion Working Copy for storage."""
def __init__(self, bzrdir, local_path, branch):
self._format = SvnWorkingTreeFormat()
self.basedir = local_path
self.pool = Pool()
self.client_ctx = svn.client.create_context()
self.client_ctx.config = svn_config
- self.client_ctx.log_msg_func2 = svn.client.svn_swig_py_get_commit_log_func
+ self.client_ctx.log_msg_func2 = \
+ svn.client.svn_swig_py_get_commit_log_func
self.client_ctx.auth_baton = _create_auth_baton(self.pool)
- wc = self._get_wc()
+ self._get_wc()
status = svn.wc.revision_status(self.basedir, None, True, None, None)
if status.min_rev != status.max_rev:
#raise WorkingTreeInconsistent(status.min_rev, status.max_rev)
self.read_working_inventory()
- self.controldir = os.path.join(self.basedir, svn.wc.get_adm_dir(), 'bzr')
+ self.controldir = os.path.join(self.basedir, svn.wc.get_adm_dir(),
+ 'bzr')
try:
os.makedirs(self.controldir)
os.makedirs(os.path.join(self.controldir, 'lock'))
except OSError:
pass
- control_transport = bzrdir.transport.clone(os.path.join(svn.wc.get_adm_dir(), 'bzr'))
+ control_transport = bzrdir.transport.clone(os.path.join(
+ svn.wc.get_adm_dir(), 'bzr'))
self._control_files = LockableFiles(control_transport, 'lock', LockDir)
def lock_write(self):
pass
def get_ignore_list(self):
- ignores = [svn.wc.get_adm_dir()] + svn.wc.get_default_ignores(svn_config)
+ ignores = [svn.wc.get_adm_dir()] + \
+ svn.wc.get_default_ignores(svn_config)
def dir_add(wc, prefix):
- ignorestr = svn.wc.prop_get(svn.core.SVN_PROP_IGNORE, self.abspath(prefix).rstrip("/"), wc)
+ ignorestr = svn.wc.prop_get(svn.core.SVN_PROP_IGNORE,
+ self.abspath(prefix).rstrip("/"), wc)
if ignorestr is not None:
for pat in ignorestr.splitlines():
ignores.append("./"+os.path.join(prefix, pat))
subprefix = os.path.join(prefix, entry)
- subwc = svn.wc.adm_open3(wc, self.abspath(subprefix), False, 0, None)
+ subwc = svn.wc.adm_open3(wc, self.abspath(subprefix), False,
+ 0, None)
try:
dir_add(subwc, subprefix)
finally:
return svn.wc.is_adm_dir(path)
def remove(self, files, verbose=False, to_file=None):
+ # FIXME: Use to_file argument
+ # FIXME: Use verbose argument
assert isinstance(files, list)
wc = self._get_wc(write_lock=True)
try:
file = os.path.basename(relpath)
return (self._get_wc(dir, write_lock), file)
- def move(self, from_paths, to_name):
+ def move(self, from_paths, to_dir=None, after=False, **kwargs):
+ # FIXME: Use after argument
revt = svn.core.svn_opt_revision_t()
revt.kind = svn.core.svn_opt_revision_working
for entry in from_paths:
try:
- to_wc = self._get_wc(to_name, write_lock=True)
+ to_wc = self._get_wc(to_dir, write_lock=True)
svn.wc.copy(self.abspath(entry), to_wc,
os.path.basename(entry), None, None)
finally:
svn.wc.delete2(self.abspath(entry), from_wc, None, None, None)
finally:
svn.wc.adm_close(from_wc)
- new_name = "%s/%s" % (to_name, os.path.basename(entry))
+ new_name = "%s/%s" % (to_dir, os.path.basename(entry))
self._change_fileid_mapping(self.inventory.path2id(entry), new_name)
self._change_fileid_mapping(None, entry)
self.read_working_inventory()
- def rename_one(self, from_rel, to_rel):
+ def rename_one(self, from_rel, to_rel, after=False):
+ # FIXME: Use after
revt = svn.core.svn_opt_revision_t()
revt.kind = svn.core.svn_opt_revision_unspecified
(to_wc, to_file) = self._get_rel_wc(to_rel, write_lock=True)
assert isinstance(revnum, int) and revnum >= 0
assert isinstance(path, basestring)
- (bp, rp) = self.branch.repository.scheme.unprefix(path)
+ (_, rp) = self.branch.repository.scheme.unprefix(path)
entry = self.base_tree.id_map[rp]
assert entry[0] is not None
+ assert isinstance(entry[0], str), "fileid %r for %r is not a string" % (entry[0], path)
return entry
def read_working_inventory(self):
entry.schedule == svn.wc.schedule_replace):
# See if the file this file was copied from disappeared
# and has no other copies -> in that case, take id of other file
- if entry.copyfrom_url and list(find_copies(entry.copyfrom_url)) == [relpath]:
- return self.path_to_file_id(entry.copyfrom_rev, entry.revision,
- entry.copyfrom_url[len(entry.repos):])
+ if (entry.copyfrom_url and
+ list(find_copies(entry.copyfrom_url)) == [relpath]):
+ return self.path_to_file_id(entry.copyfrom_rev,
+ entry.revision, entry.copyfrom_url[len(entry.repos):])
ids = self._get_new_file_ids(rootwc)
if ids.has_key(relpath):
return (ids[relpath], None)
def add_dir_to_inv(relpath, wc, parent_id):
entries = svn.wc.entries_read(wc, False)
entry = entries[""]
- assert parent_id is None or isinstance(parent_id, str), "%r is not a string" % parent_id
+ assert parent_id is None or isinstance(parent_id, str), \
+ "%r is not a string" % parent_id
(id, revid) = find_ids(entry, rootwc)
if id is None:
mutter('no id for %r' % entry.url)
return
+ assert revid is None or isinstance(revid, str), "%r is not a string" % revid
assert isinstance(id, str), "%r is not a string" % id
# First handle directory itself
self.base_tree = RevisionTree(self, Inventory(), revid)
return
- (bp, rev) = self.branch.repository.parse_revision_id(revid)
- assert bp == self.branch.branch_path
+ rev = self.branch.lookup_revision_id(revid)
self.base_revnum = rev
self.base_revid = revid
self.base_tree = SvnBasisTree(self)
def update_settings(wc, path):
id = newrevtree.inventory.path2id(path)
mutter("Updating settings for %r" % id)
- (_, revnum) = self.branch.repository.parse_revision_id(
+ revnum = self.branch.lookup_revision_id(
newrevtree.inventory[id].revision)
svn.wc.process_committed2(self.abspath(path).rstrip("/"), wc,
svn.wc.adm_close(wc)
self.base_revid = revid
- def commit(self, message=None, message_callback=None, revprops=None, timestamp=None, timezone=None, committer=None,
- rev_id=None, allow_pointless=True, strict=False, verbose=False, local=False, reporter=None, config=None,
- specific_files=None):
+ def commit(self, message=None, message_callback=None, revprops=None,
+ timestamp=None, timezone=None, committer=None, rev_id=None,
+ allow_pointless=True, strict=False, verbose=False, local=False,
+ reporter=None, config=None, specific_files=None):
+ # FIXME: Use allow_pointless
+ # FIXME: Use committer
+ # FIXME: Use verbose
+ # FIXME: Use reporter
+ # FIXME: Use revprops
+ # FIXME: Raise exception when local is True
+ # FIXME: Use strct
assert timestamp is None
assert timezone is None
assert rev_id is None
return message.encode("utf-8")
self.client_ctx.log_msg_baton2 = log_message_func
- commit_info = svn.client.commit3(specific_files, True, False, self.client_ctx)
+ commit_info = svn.client.commit3(specific_files, True, False,
+ self.client_ctx)
self.client_ctx.log_msg_baton2 = None
revid = self.branch.repository.generate_revision_id(
return revid
- def add(self, files, ids=None):
+ def add(self, files, ids=None, kinds=None):
+ # FIXME: Use kinds
if isinstance(files, str):
files = [files]
if isinstance(ids, str):
return self.base_tree
- def pull(self, source, overwrite=False, stop_revision=None, delta_reporter=None):
+ def pull(self, source, overwrite=False, stop_revision=None,
+ delta_reporter=None):
+ # FIXME: Use delta_reporter
+ # FIXME: Use overwrite
result = PullResult()
result.source_branch = source
result.master_branch = None
stop_revision = self.branch.last_revision()
rev = svn.core.svn_opt_revision_t()
rev.kind = svn.core.svn_opt_revision_number
- rev.value.number = self.branch.repository.parse_revision_id(stop_revision)[1]
+ rev.value.number = self.branch.lookup_revision_id(stop_revision)
fetched = svn.client.update(self.basedir, rev, True, self.client_ctx)
self.base_revid = self.branch.repository.generate_revision_id(fetched, self.branch.branch_path)
result.new_revid = self.branch.generate_revision_id(fetched)
if new_entries.has_key(path):
del new_entries[path]
else:
+ assert isinstance(id, str)
new_entries[path] = id
committed = self.branch.repository.branchprop_list.get_property(
self.branch.branch_path,
def _get_new_file_ids(self, wc):
committed = self.branch.repository.branchprop_list.get_property(
- self.branch.branch_path,
- self.base_revnum,
+ self.branch.branch_path, self.base_revnum,
SVN_PROP_BZR_FILEIDS, "")
existing = svn.wc.prop_get(SVN_PROP_BZR_FILEIDS, self.basedir, wc)
if existing is None:
return {}
else:
- return dict(map(lambda x: x.split("\t"), existing[len(committed):].splitlines()))
+ return dict(map(lambda x: str(x).split("\t"),
+ existing[len(committed):].splitlines()))
def _get_bzr_merges(self):
return self.branch.repository.branchprop_list.get_property(
- self.branch.branch_path,
- self.base_revnum,
+ self.branch.branch_path, self.base_revnum,
SVN_PROP_BZR_MERGE, "")
def _get_svk_merges(self):
return self.branch.repository.branchprop_list.get_property(
- self.branch.branch_path,
- self.base_revnum,
+ self.branch.branch_path, self.base_revnum,
SVN_PROP_SVK_MERGE, "")
def set_pending_merges(self, merges):
def add_pending_merge(self, revid):
merges = self.pending_merges()
merges.append(revid)
- self.set_pending_merges(existing)
+ self.set_pending_merges(merges)
def pending_merges(self):
merged = self._get_bzr_merges().splitlines()
class SvnWorkingTreeFormat(WorkingTreeFormat):
+ """Subversion working copy format."""
def get_format_description(self):
return "Subversion Working Copy"
if not self.scheme.is_branch(self.branch_path) and not self.scheme.is_tag(self.branch_path):
raise NotBranchError(path=self.transport.base)
- def clone(self, path):
+ def clone(self, path, revision_id=None, force_new_repo=False):
raise NotImplementedError(self.clone)
- def open_workingtree(self, _unsupported=False):
+ def open_workingtree(self, _unsupported=False, recommend_upgrade=False):
return SvnWorkingTree(self, self.local_path, self.open_branch())
- def sprout(self, url, revision_id=None, basis=None, force_new_repo=False):
+ def sprout(self, url, revision_id=None, force_new_repo=False,
+ recurse='down'):
# FIXME: honor force_new_repo
+ # FIXME: Use recurse
result = BzrDirFormat.get_default_format().initialize(url)
repo = self.find_repository()
- result_repo = repo.clone(result, revision_id, basis)
+ repo.clone(result, revision_id)
branch = self.open_branch()
branch.sprout(result, revision_id)
result.create_workingtree()
try:
branch = SvnBranch(self.root_transport.base, repos, self.branch_path)
- except SubversionException, (msg, num):
+ except SubversionException, (_, num):
if num == svn.core.SVN_ERR_WC_NOT_DIRECTORY:
- raise NotBranchError(path=self.url)
+ raise NotBranchError(path=self.base)
raise
branch.bzrdir = self
if isinstance(transport, LocalTransport) and \
transport.has(svn.wc.get_adm_dir()):
+ subr_version = svn.core.svn_subr_version()
+ if subr_version.major == 1 and subr_version.minor < 4:
+ raise NoCheckoutSupport()
return format
raise NotBranchError(path=transport.base)
def initialize_on_transport(self, transport):
raise NotImplementedError(self.initialize_on_transport)
+
+ def get_converter(self, format=None):
+ """See BzrDirFormat.get_converter()."""
+ if format is None:
+ format = BzrDirFormat.get_default_format()
+ return SvnConverter(format)