"""Checkouts and working trees (working copies)."""
import bzrlib, bzrlib.add
-from bzrlib import urlutils
+from bzrlib import osutils, urlutils, ignores
from bzrlib.branch import PullResult
from bzrlib.bzrdir import BzrDirFormat, BzrDir
from bzrlib.errors import (InvalidRevisionId, NotBranchError, NoSuchFile,
from bzrlib.inventory import Inventory, InventoryFile, InventoryLink
from bzrlib.lockable_files import TransportLock, LockableFiles
from bzrlib.lockdir import LockDir
-from bzrlib.osutils import file_kind, fingerprint_file, supports_executable
from bzrlib.revision import NULL_REVISION
from bzrlib.trace import mutter
from bzrlib.revisiontree import RevisionTree
from commit import _revision_id_to_svk_feature
import constants
from convert import SvnConverter
-from errors import LocalCommitsUnsupported, NoSvnRepositoryPresent
+from errors import NoSvnRepositoryPresent
from bzrlib.plugins.svn.mapping import (SVN_PROP_BZR_ANCESTRY, SVN_PROP_BZR_FILEIDS,
SVN_PROP_BZR_REVISION_ID, SVN_PROP_BZR_REVISION_INFO,
generate_revision_metadata)
self._control_files = LockableFiles(control_transport, 'lock', LockDir)
def get_ignore_list(self):
- ignores = set([wc.get_adm_dir()])
- ignores.update(wc.get_default_ignores(svn_config))
+ ignore_globs = set([wc.get_adm_dir()])
+ ignore_globs.update(ignores.get_runtime_ignores())
+ ignore_globs.update(ignores.get_user_ignores())
def dir_add(adm, prefix, patprefix):
ignorestr = adm.prop_get(constants.PROP_IGNORE,
self.abspath(prefix).rstrip("/"))
if ignorestr is not None:
for pat in ignorestr.splitlines():
- ignores.add(urlutils.joinpath(patprefix, pat))
+ ignore_globs.add(urlutils.joinpath(patprefix, pat))
entries = adm.entries_read(False)
for entry in entries:
finally:
adm.close()
- return ignores
+ return ignore_globs
def is_control_filename(self, path):
return wc.is_adm_dir(path)
file = InventoryFile(id, os.path.basename(relpath), parent_id)
file.revision = revid
try:
- data = fingerprint_file(open(self.abspath(relpath)))
+ data = osutils.fingerprint_file(open(self.abspath(relpath)))
file.text_sha1 = data['sha1']
file.text_size = data['size']
file.executable = self.is_executable(id, relpath)
adm.close()
self.base_revid = revid
- def commit(self, message=None, message_callback=None, revprops=None,
- timestamp=None, timezone=None, committer=None, rev_id=None,
- allow_pointless=True, strict=False, verbose=False, local=False,
- reporter=None, config=None, specific_files=None, author=None):
- if author is not None:
- revprops['author'] = author
- # FIXME: Use allow_pointless
- # FIXME: Use verbose
- # FIXME: Use reporter
- # FIXME: Use strict
- if local:
- raise LocalCommitsUnsupported()
-
- if specific_files:
- specific_files = [self.abspath(x).encode('utf8') for x in specific_files]
- else:
- specific_files = [self.basedir.encode('utf8')]
-
- if message_callback is not None:
- def log_message_func(items):
- """ Simple log message provider for unit tests. """
- return message_callback(self).encode("utf-8")
- else:
- assert isinstance(message, basestring)
- def log_message_func(items):
- """ Simple log message provider for unit tests. """
- return message.encode("utf-8")
-
- import client
- self.client_ctx = client.Client()
- self.client_ctx.set_log_msg_func(log_message_func)
- if rev_id is not None:
- extra = "%d %s\n" % (self.branch.revno()+1, rev_id)
- else:
- extra = ""
- adm = self._get_wc(write_lock=True)
- try:
- adm.prop_set(SVN_PROP_BZR_REVISION_ID+str(self.branch.mapping.scheme),
- self._get_bzr_revids(self._get_base_branch_props()) + extra,
- self.basedir)
- adm.prop_set(SVN_PROP_BZR_REVISION_INFO,
- generate_revision_metadata(timestamp,
- timezone,
- committer,
- revprops),
- self.basedir)
- finally:
- adm.close()
-
- try:
- try:
- commit_info = self.client_ctx.commit(specific_files, True, False)
- except SubversionException, (_, num):
- if num == constants.ERR_FS_TXN_OUT_OF_DATE:
- raise OutOfDateTree(self)
- raise
- except:
- # Reset properties so the next subversion commit won't
- # accidently set these properties.
- adm = self._get_wc(write_lock=True)
- base_branch_props = self._get_base_branch_props()
- adm.prop_set(SVN_PROP_BZR_REVISION_ID+str(self.branch.mapping.scheme),
- self._get_bzr_revids(base_branch_props), self.basedir)
- adm.prop_set(SVN_PROP_BZR_REVISION_INFO,
- base_branch_props.get(SVN_PROP_BZR_REVISION_INFO, ""),
- self.basedir)
- adm.close()
- raise
-
- self.client_ctx.set_log_msg_func(None)
-
- revid = self.branch.generate_revision_id(commit_info[0])
-
- self.base_revid = revid
- self.base_revnum = commit_info[0]
- self.base_tree = None
-
- return revid
-
def smart_add(self, file_list, recurse=True, action=None, save=True):
assert isinstance(recurse, bool)
if action is None:
mutter('adding %r' % file_path)
adm.add(file_path, None, 0, None, None, None)
added.append(file_path)
- if recurse and file_kind(file_path) == 'directory':
+ if recurse and osutils.file_kind(file_path) == 'directory':
# Filter out ignored files and update ignored
for c in os.listdir(file_path):
if self.is_control_filename(c):
def get_file_sha1(self, file_id, path=None, stat_value=None):
if not path:
path = self._inventory.id2path(file_id)
- return fingerprint_file(open(self.abspath(path)))['sha1']
+ return osutils.fingerprint_file(open(self.abspath(path)))['sha1']
def _change_fileid_mapping(self, id, path, adm=None):
if adm is None:
merges.append(revid)
self.set_pending_merges(merges)
+ def get_parent_ids(self):
+ return [self.base_revid] + self.pending_merges()
+
def pending_merges(self):
merged = self._get_bzr_merges(self._get_base_branch_props()).splitlines()
adm = self._get_wc()
return []
+ def path_content_summary(self, path, _lstat=os.lstat,
+ _mapper=osutils.file_kind_from_stat_mode):
+ """See Tree.path_content_summary."""
+ abspath = self.abspath(path)
+ try:
+ stat_result = _lstat(abspath)
+ except OSError, e:
+ if getattr(e, 'errno', None) == errno.ENOENT:
+ # no file.
+ return ('missing', None, None, None)
+ # propagate other errors
+ raise
+ kind = _mapper(stat_result.st_mode)
+ if kind == 'file':
+ size = stat_result.st_size
+ # try for a stat cache lookup
+ executable = self._is_executable_from_path_and_stat(path, stat_result)
+ return (kind, size, executable, self._sha_from_stat(
+ path, stat_result))
+ elif kind == 'directory':
+ return kind, None, None, None
+ elif kind == 'symlink':
+ return ('symlink', None, None, os.readlink(abspath))
+ else:
+ return (kind, None, None, None)
+
def _reset_data(self):
pass
finally:
self.branch.unlock()
- if not supports_executable():
+ if not osutils.supports_executable():
def is_executable(self, file_id, path=None):
inv = self.basis_tree()._inventory
if file_id in inv: