--- /dev/null
- class FakeControlFiles(object):
- """Dummy implementation of ControlFiles.
-
- This is required as some code relies on controlfiles being
- available."""
- def get_utf8(self, name):
- raise NoSuchFile(name)
-
- def get(self, name):
- raise NoSuchFile(name)
-
- def break_lock(self):
- pass
-
-
+# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""Handles branch-specific operations."""
+
+from bzrlib import ui, urlutils
+from bzrlib.branch import Branch, BranchFormat, BranchCheckResult, PullResult
+from bzrlib.bzrdir import BzrDir
+from bzrlib.errors import (NoSuchFile, DivergedBranches, NoSuchRevision,
+ NotBranchError, UnstackableBranchFormat)
+from bzrlib.revision import is_null, ensure_null
+from bzrlib.workingtree import WorkingTree
+
+from bzrlib.plugins.svn import core, wc
+from bzrlib.plugins.svn.commit import push, push_ancestors
+from bzrlib.plugins.svn.config import BranchConfig
+from bzrlib.plugins.svn.core import SubversionException
+from bzrlib.plugins.svn.errors import NotSvnBranchPath, ERR_FS_NO_SUCH_REVISION
++from bzrlib.plugins.svn.foreign import FakeControlFiles
+from bzrlib.plugins.svn.format import get_rich_root_format
+from bzrlib.plugins.svn.repository import SvnRepository
+from bzrlib.plugins.svn.tags import SubversionTags
+from bzrlib.plugins.svn.transport import bzr_to_svn_url
+
+import os
+
+class SvnBranch(Branch):
+ """Maps to a Branch in a Subversion repository """
+ def __init__(self, repository, branch_path, _skip_check=False):
+ """Instantiate a new SvnBranch.
+
+ :param repos: SvnRepository this branch is part of.
+ :param branch_path: Relative path inside the repository this
+ branch is located at.
+ :param revnum: Subversion revision number of the branch to
+ look at; none for latest.
+ """
+ self.repository = repository
+ super(SvnBranch, self).__init__()
+ assert isinstance(self.repository, SvnRepository)
+ self.control_files = FakeControlFiles()
+ self._format = SvnBranchFormat()
+ self._lock_mode = None
+ self._lock_count = 0
+ self.mapping = self.repository.get_mapping()
+ self.layout = self.repository.get_layout()
+ self._branch_path = branch_path.strip("/")
+ self.base = urlutils.join(self.repository.base,
+ self._branch_path).rstrip("/")
+ self._revmeta_cache = None
+ assert isinstance(self._branch_path, str)
+ if not _skip_check:
+ try:
+ revnum = self.get_revnum()
+ if self.repository.transport.check_path(self._branch_path,
+ revnum) != core.NODE_DIR:
+ raise NotBranchError(self.base)
+ except SubversionException, (_, num):
+ if num == ERR_FS_NO_SUCH_REVISION:
+ raise NotBranchError(self.base)
+ raise
+ (type, self.project, _, ip) = self.layout.parse(branch_path)
+ # FIXME: Don't allow tag here
+ if type not in ('branch', 'tag') or ip != '':
+ raise NotSvnBranchPath(branch_path, mapping=self.mapping)
+
+ def _make_tags(self):
+ return SubversionTags(self)
+
+ def set_branch_path(self, branch_path):
+ """Change the branch path for this branch.
+
+ :param branch_path: New branch path.
+ """
+ self._branch_path = branch_path.strip("/")
+
+ def _get_append_revisions_only(self):
+ value = self.get_config().get_user_option('append_revisions_only')
+ return value == 'True'
+
+ def unprefix(self, relpath):
+ """Remove the branch path from a relpath.
+
+ :param relpath: path from the repository root.
+ """
+ assert relpath.startswith(self.get_branch_path()), \
+ "expected %s prefix, got %s" % (self.get_branch_path(), relpath)
+ return relpath[len(self.get_branch_path()):].strip("/")
+
+ def get_branch_path(self, revnum=None):
+ """Find the branch path of this branch in the specified revnum.
+
+ :param revnum: Revnum to look for.
+ """
+ if revnum is None:
+ return self._branch_path
+
+ if revnum == self.get_revnum():
+ return self._branch_path
+
+ # Use revnum - this branch may have been moved in the past
+ return self.repository.transport.get_locations(
+ self._branch_path, self.get_revnum(),
+ [revnum])[revnum].strip("/")
+
+ def get_revnum(self):
+ """Obtain the Subversion revision number this branch was
+ last changed in.
+
+ :return: Revision number
+ """
+ if self._lock_mode == 'r' and self._cached_revnum:
+ return self._cached_revnum
+ latest_revnum = self.repository.get_latest_revnum()
+ self._cached_revnum = self.repository._log.find_latest_change(
+ self.get_branch_path(), latest_revnum)
+ if self._cached_revnum is None:
+ raise NotBranchError(self.base)
+ return self._cached_revnum
+
+ def check(self):
+ """See Branch.Check.
+
+ Doesn't do anything for Subversion repositories at the moment (yet).
+ """
+ return BranchCheckResult(self)
+
+ def _create_heavyweight_checkout(self, to_location, revision_id=None,
+ hardlink=False):
+ """Create a new heavyweight checkout of this branch.
+
+ :param to_location: URL of location to create the new checkout in.
+ :param revision_id: Revision that should be the tip of the checkout.
+ :param hardlink: Whether to hardlink
+ :return: WorkingTree object of checkout.
+ """
+ checkout_branch = BzrDir.create_branch_convenience(
+ to_location, force_new_tree=False, format=get_rich_root_format())
+ checkout = checkout_branch.bzrdir
+ checkout_branch.bind(self)
+ # pull up to the specified revision_id to set the initial
+ # branch tip correctly, and seed it with history.
+ checkout_branch.pull(self, stop_revision=revision_id)
+ return checkout.create_workingtree(revision_id, hardlink=hardlink)
+
+ def lookup_revision_id(self, revid):
+ """Look up the matching Subversion revision number on the mainline of
+ the branch.
+
+ :param revid: Revision id to look up.
+ :return: Revision number on the branch.
+ :raises NoSuchRevision: If the revision id was not found.
+ """
+ (bp, revnum, mapping) = self.repository.lookup_revision_id(revid,
+ ancestry=(self.get_branch_path(), self.get_revnum()))
+ assert bp.strip("/") == self.get_branch_path(revnum).strip("/"), \
+ "Got %r, expected %r" % (bp, self.get_branch_path(revnum))
+ return revnum
+
+ def _create_lightweight_checkout(self, to_location, revision_id=None):
+ """Create a new lightweight checkout of this branch.
+
+ :param to_location: URL of location to create the checkout in.
+ :param revision_id: Tip of the checkout.
+ :return: WorkingTree object of the checkout.
+ """
+ from bzrlib.plugins.svn.workingtree import update_wc
+ if revision_id is not None:
+ revnum = self.lookup_revision_id(revision_id)
+ else:
+ revnum = self.get_revnum()
+
+ svn_url = bzr_to_svn_url(self.base)
+ os.mkdir(to_location)
+ wc.ensure_adm(to_location, self.repository.uuid, svn_url,
+ bzr_to_svn_url(self.repository.base), revnum)
+ adm = wc.WorkingCopy(None, to_location, write_lock=True)
+ try:
+ conn = self.repository.transport.connections.get(svn_url)
+ try:
+ update_wc(adm, to_location, conn, revnum)
+ finally:
+ if not conn.busy:
+ self.repository.transport.add_connection(conn)
+ finally:
+ adm.close()
+ wt = WorkingTree.open(to_location)
+ return wt
+
+ def create_checkout(self, to_location, revision_id=None, lightweight=False,
+ accelerator_tree=None, hardlink=False):
+ """See Branch.create_checkout()."""
+ if lightweight:
+ return self._create_lightweight_checkout(to_location, revision_id)
+ else:
+ return self._create_heavyweight_checkout(to_location, revision_id,
+ hardlink=hardlink)
+
+ def generate_revision_id(self, revnum):
+ """Generate a new revision id for a revision on this branch."""
+ assert isinstance(revnum, int)
+ try:
+ return self.repository.generate_revision_id(
+ revnum, self.get_branch_path(revnum), self.mapping)
+ except SubversionException, (_, num):
+ if num == ERR_FS_NO_SUCH_REVISION:
+ raise NoSuchRevision(self, revnum)
+ raise
+
+ def get_config(self):
+ return BranchConfig(self)
+
+ def _get_nick(self):
+ """Find the nick name for this branch.
+
+ :return: Branch nick
+ """
+ bp = self._branch_path.strip("/")
+ if self._branch_path == "":
+ return self.base.split("/")[-1]
+ return bp
+
+ nick = property(_get_nick)
+
+ def set_revision_history(self, rev_history):
+ """See Branch.set_revision_history()."""
+ if (rev_history == [] or
+ not self.repository.has_revision(rev_history[-1])):
+ raise NotImplementedError("set_revision_history can't add ghosts")
+ push(self.repository.get_graph(),
+ self, self.repository, rev_history[-1])
+ self._clear_cached_state()
+
+ def set_last_revision_info(self, revno, revid):
+ """See Branch.set_last_revision_info()."""
+
+ def mainline_missing_revisions(self, other, stop_revision):
+ """Find the revisions missing on the mainline.
+
+ :param other: Other branch to retrieve revisions from.
+ :param stop_revision: Revision to stop fetching at.
+ """
+ missing = []
+ lastrevid = self.last_revision()
+ for revid in other.repository.iter_reverse_revision_history(stop_revision):
+ if lastrevid == revid:
+ missing.reverse()
+ return missing
+ missing.append(revid)
+ return None
+
+ def otherline_missing_revisions(self, other, stop_revision, overwrite=False):
+ """Find the revisions missing on the mainline.
+
+ :param other: Other branch to retrieve revisions from.
+ :param stop_revision: Revision to stop fetching at.
+ :param overwrite: Whether or not the existing data should be overwritten
+ """
+ missing = []
+ for revid in other.repository.iter_reverse_revision_history(stop_revision):
+ if self.repository.has_revision(revid):
+ missing.reverse()
+ return missing
+ missing.append(revid)
+ if not overwrite:
+ return None
+ else:
+ return missing
+
+ def last_revision_info(self):
+ """See Branch.last_revision_info()."""
+ last_revid = self.last_revision()
+ return self.revision_id_to_revno(last_revid), last_revid
+
+ def revision_id_to_revno(self, revision_id):
+ """Given a revision id, return its revno"""
+ if is_null(revision_id):
+ return 0
+ revmeta_history = self._revision_meta_history()
+ for revmeta in revmeta_history:
+ if revmeta.get_revision_id(self.mapping) == revision_id:
+ return len(revmeta_history) - revmeta_history.index(revmeta)
+ raise NoSuchRevision(self, revision_id)
+
+ def get_root_id(self, revnum=None):
+ if revnum is None:
+ tree = self.basis_tree()
+ else:
+ tree = self.repository.revision_tree(self.get_rev_id(revnum))
+ return tree.get_root_id()
+
+ def set_push_location(self, location):
+ """See Branch.set_push_location()."""
+ raise NotImplementedError(self.set_push_location)
+
+ def get_push_location(self):
+ """See Branch.get_push_location()."""
+ # get_push_location not supported on Subversion
+ return None
+
+ def _revision_meta_history(self):
+ if self._revmeta_cache is None:
+ pb = ui.ui_factory.nested_progress_bar()
+ try:
+ self._revmeta_cache = list(self.repository.iter_reverse_branch_changes(self.get_branch_path(), self.get_revnum(), to_revnum=0, mapping=self.mapping, pb=pb))
+ finally:
+ pb.finished()
+ return self._revmeta_cache
+
+ def _gen_revision_history(self):
+ """Generate the revision history from last revision
+ """
+ pb = ui.ui_factory.nested_progress_bar()
+ try:
+ history = []
+ for revmeta in self._revision_meta_history():
+ history.append(revmeta.get_revision_id(self.mapping))
+ finally:
+ pb.finished()
+ history.reverse()
+ return history
+
+ def last_revision(self):
+ """See Branch.last_revision()."""
+ # Shortcut for finding the tip. This avoids expensive generation time
+ # on large branches.
+ return self.generate_revision_id(self.get_revnum())
+
+ def pull(self, source, overwrite=False, stop_revision=None,
+ _hook_master=None, run_hooks=True, _push_merged=None):
+ """See Branch.pull()."""
+ result = PullResult()
+ result.source_branch = source
+ result.master_branch = None
+ result.target_branch = self
+ source.lock_read()
+ try:
+ (result.old_revno, result.old_revid) = self.last_revision_info()
+ self.update_revisions(source, stop_revision, overwrite,
+ _push_merged=_push_merged)
+ result.tag_conflicts = source.tags.merge_to(self.tags, overwrite)
+ (result.new_revno, result.new_revid) = self.last_revision_info()
+ return result
+ finally:
+ source.unlock()
+
+ def generate_revision_history(self, revision_id, last_rev=None,
+ other_branch=None):
+ """Create a new revision history that will finish with revision_id.
+
+ :param revision_id: the new tip to use.
+ :param last_rev: The previous last_revision. If not None, then this
+ must be a ancestory of revision_id, or DivergedBranches is raised.
+ :param other_branch: The other branch that DivergedBranches should
+ raise with respect to.
+ """
+ # stop_revision must be a descendant of last_revision
+ # make a new revision history from the graph
+
+ def _synchronize_history(self, destination, revision_id):
+ """Synchronize last revision and revision history between branches.
+
+ This version is most efficient when the destination is also a
+ BzrBranch6, but works for BzrBranch5, as long as the destination's
+ repository contains all the lefthand ancestors of the intended
+ last_revision. If not, set_last_revision_info will fail.
+
+ :param destination: The branch to copy the history into
+ :param revision_id: The revision-id to truncate history at. May
+ be None to copy complete history.
+ """
+ if revision_id is None:
+ revno, revision_id = self.last_revision_info()
+ else:
+ revno = self.revision_id_to_revno(revision_id)
+ destination.set_last_revision_info(revno, revision_id)
+
+ def update_revisions(self, other, stop_revision=None, overwrite=False,
+ graph=None, _push_merged=False):
+ """See Branch.update_revisions()."""
+ if stop_revision is None:
+ stop_revision = ensure_null(other.last_revision())
+ if (self.last_revision() == stop_revision or
+ self.last_revision() == other.last_revision()):
+ return
+ if graph is None:
+ graph = self.repository.get_graph()
+ other_graph = other.repository.get_graph()
+ if not other_graph.is_ancestor(self.last_revision(),
+ stop_revision):
+ if graph.is_ancestor(stop_revision, self.last_revision()):
+ return
+ if not overwrite:
+ raise DivergedBranches(self, other)
+ todo = self.mainline_missing_revisions(other, stop_revision)
+ if todo is None:
+ # Not possible to add cleanly onto mainline, perhaps need a replace operation
+ todo = self.otherline_missing_revisions(other, stop_revision, overwrite)
+ if todo is None:
+ raise DivergedBranches(self, other)
+ if _push_merged is None:
+ _push_merged = self.layout.push_merged_revisions(self.project)
+ self._push_missing_revisions(graph, other, other_graph, todo,
+ _push_merged)
+
+ def _push_missing_revisions(self, my_graph, other, other_graph, todo,
+ push_merged=False):
+ pb = ui.ui_factory.nested_progress_bar()
+ try:
+ for revid in todo:
+ pb.update("pushing revisions", todo.index(revid),
+ len(todo))
+ if push_merged:
+ parent_revids = other_graph.get_parent_map([revid])[revid]
+ push_ancestors(self.repository, other.repository, self.layout, self.project, parent_revids, other_graph)
+ push(my_graph, self, other.repository, revid)
+ self._clear_cached_state()
+ finally:
+ pb.finished()
+
+ def lock_write(self):
+ """See Branch.lock_write()."""
+ # TODO: Obtain lock on the remote server?
+ if self._lock_mode:
+ assert self._lock_mode == 'w'
+ self._lock_count += 1
+ else:
+ self._lock_mode = 'w'
+ self._lock_count = 1
+ self.repository.lock_write()
+
+ def lock_read(self):
+ """See Branch.lock_read()."""
+ if self._lock_mode:
+ assert self._lock_mode in ('r', 'w')
+ self._lock_count += 1
+ else:
+ self._lock_mode = 'r'
+ self._lock_count = 1
+ self.repository.lock_read()
+
+ def unlock(self):
+ """See Branch.unlock()."""
+ self._lock_count -= 1
+ if self._lock_count == 0:
+ self._lock_mode = None
+ self._clear_cached_state()
+ self.repository.unlock()
+
+ def _clear_cached_state(self):
+ super(SvnBranch, self)._clear_cached_state()
+ self._cached_revnum = None
+ self._revmeta_cache = None
+
+ def get_parent(self):
+ """See Branch.get_parent()."""
+ return None
+
+ def set_parent(self, url):
+ """See Branch.set_parent()."""
+
+ def get_physical_lock_status(self):
+ """See Branch.get_physical_lock_status()."""
+ return False
+
+ def sprout(self, to_bzrdir, revision_id=None):
+ """See Branch.sprout()."""
+ result = to_bzrdir.create_branch()
+ self.copy_content_into(result, revision_id=revision_id)
+ result.set_parent(self.bzrdir.root_transport.base)
+ return result
+
+ def get_stacked_on_url(self):
+ raise UnstackableBranchFormat(self._format, self.base)
+
+ def __str__(self):
+ return '%s(%r)' % (self.__class__.__name__, self.base)
+
+ def supports_tags(self):
+ return self._format.supports_tags()
+
+ __repr__ = __str__
+
+
+class SvnBranchFormat(BranchFormat):
+ """Branch format for Subversion Branches."""
+ def __init__(self):
+ BranchFormat.__init__(self)
+
+ def __get_matchingbzrdir(self):
+ """See BranchFormat.__get_matchingbzrdir()."""
+ from remote import SvnRemoteFormat
+ return SvnRemoteFormat()
+
+ _matchingbzrdir = property(__get_matchingbzrdir)
+
+ def get_format_description(self):
+ """See BranchFormat.get_format_description."""
+ return 'Subversion Smart Server'
+
+ def get_format_string(self):
+ """See BranchFormat.get_format_string()."""
+ return 'Subversion Smart Server'
+
+ def initialize(self, to_bzrdir):
+ """See BranchFormat.initialize()."""
+ raise NotImplementedError(self.initialize)
+
+ def supports_tags(self):
+ return True
--- /dev/null
--- /dev/null
++_trial_temp
--- /dev/null
--- /dev/null
++- Import VirtualRevisionTexts, VirtualInventoryTexts, VirtualSignatureTexts
++- Import dpush command
++- Import CommitBuilder-based fetcher
--- /dev/null
- from bzrlib import registry
+# Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""Foreign branch utilities."""
+
++from bzrlib import errors, registry
+
+
+class VcsMapping(object):
+ """Describes the mapping between the semantics of Bazaar and a foreign vcs.
+
+ """
+ experimental = False
+ roundtripping = False
+ revid_prefix = None
+
+
+class VcsMappingRegistry(registry.Registry):
+ """Registry for Bazaar<->foreign VCS mappings.
+
+ There should be one instance of this registry for every foreign VCS.
+ """
+
+ def register(self, key, factory, help):
+ """Register a mapping between Bazaar and foreign VCS semantics.
+
+ The factory must be a callable that takes one parameter: the key.
+ It must produce an instance of VcsMapping when called.
+ """
+ registry.Registry.register(self, key, factory, help)
+
+ def set_default(self, key):
+ """Set the 'default' key to be a clone of the supplied key.
+
+ This method must be called once and only once.
+ """
+ self._set_default_key(key)
+
+ def get_default(self):
+ """Convenience function for obtaining the default mapping to use."""
+ return self.get(self._get_default_key())
+
++
++class FakeControlFiles(object):
++ """Dummy implementation of ControlFiles.
++
++ This is required as some code relies on controlfiles being
++ available."""
++ def get_utf8(self, name):
++ raise errors.NoSuchFile(name)
++
++ def get(self, name):
++ raise errors.NoSuchFile(name)
++
++ def break_lock(self):
++ pass
++
--- /dev/null
+ # Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
+
+ # This program is free software; you can redistribute it and/or modify
+ # it under the terms of the GNU General Public License as published by
+ # the Free Software Foundation; either version 3 of the License, or
+ # (at your option) any later version.
+
+ # This program is distributed in the hope that it will be useful,
+ # but WITHOUT ANY WARRANTY; without even the implied warranty of
+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ # GNU General Public License for more details.
+
+ # You should have received a copy of the GNU General Public License
+ # along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ from bzrlib import osutils
+ from bzrlib.tests import TestCase
+
+ from versionedfiles import (VirtualRevisionTexts, VirtualInventoryTexts,
+ VirtualSignatureTexts)
+
+
+ class BasicTextsTests:
+ def test_add_lines(self):
+ self.assertRaises(NotImplementedError,
+ self.texts.add_lines, "foo", [], [])
+
+ def test_add_mpdiffs(self):
+ self.assertRaises(NotImplementedError,
+ self.texts.add_mpdiffs, [])
+
+ def test_check(self):
+ self.assertTrue(self.texts.check())
+
+ def test_insert_record_stream(self):
+ self.assertRaises(NotImplementedError, self.texts.insert_record_stream,
+ [])
+
+
+ class VirtualRevisionTextsTests(TestCase, BasicTextsTests):
+ def _make_parents_provider(self):
+ return self
+
+ def setUp(self):
+ self.texts = VirtualRevisionTexts(self)
+
+ def get_parent_map(self, keys):
+ raise NotImplementedError
+
+
+ class VirtualInventoryTextsTests(TestCase, BasicTextsTests):
+ def _make_parents_provider(self):
+ return self
+
+ def get_inventory_xml(self, key):
+ return "FOO"
+
+ def get_parent_map(self, keys):
+ return {("A",): (("B",))}
+
+ def setUp(self):
+ self.texts = VirtualInventoryTexts(self)
+
+ def test_get_sha1s(self):
+ self.assertEquals({("A",): osutils.sha_strings(["FOO"])}, self.texts.get_sha1s([("A",)]))
+
+
+ class VirtualSignatureTextsTests(TestCase, BasicTextsTests):
+ def _make_parents_provider(self):
+ return self
+
+ def setUp(self):
+ self.texts = VirtualSignatureTexts(self)
+
+ def get_parent_map(self, keys):
+ raise NotImplementedError
+
--- /dev/null
--- /dev/null
++# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
++
++# This program is free software; you can redistribute it and/or modify
++# it under the terms of the GNU General Public License as published by
++# the Free Software Foundation; either version 3 of the License, or
++# (at your option) any later version.
++
++# This program is distributed in the hope that it will be useful,
++# but WITHOUT ANY WARRANTY; without even the implied warranty of
++# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
++# GNU General Public License for more details.
++
++# You should have received a copy of the GNU General Public License
++# along with this program. If not, see <http://www.gnu.org/licenses/>.
++
++from bzrlib import osutils, urlutils
++from bzrlib.versionedfile import FulltextContentFactory, VersionedFiles, VirtualVersionedFiles
++
++from cStringIO import StringIO
++
++
++class VirtualRevisionTexts(VirtualVersionedFiles):
++ """Virtual revisions backend."""
++ def __init__(self, repository):
++ self.repository = repository
++ super(VirtualRevisionTexts, self).__init__(self.repository._make_parents_provider().get_parent_map, self.get_lines)
++
++ def get_lines(self, key):
++ return osutils.split_lines(self.repository.get_revision_xml(key))
++
++ # TODO: annotate, iter_lines_added_or_present_in_keys, keys
++
++
++class VirtualInventoryTexts(VirtualVersionedFiles):
++ """Virtual inventories backend."""
++ def __init__(self, repository):
++ self.repository = repository
++ super(VirtualInventoryTexts, self).__init__(self.repository._make_parents_provider().get_parent_map, self.get_lines)
++
++ def get_lines(self, key):
++ return osutils.split_lines(self.repository.get_inventory_xml(key))
++
++ # TODO: annotate, iter_lines_added_or_present_in_keys, keys
++
++
++class VirtualSignatureTexts(VirtualVersionedFiles):
++ """Virtual signatures backend."""
++ def __init__(self, repository):
++ self.repository = repository
++ super(VirtualSignatureTexts, self).__init__(self.repository._make_parents_provider().get_parent_map, self.get_lines)
++
++ def get_lines(self, key):
++ return osutils.split_lines(self.repository.get_signature_text(key))
++
++ # TODO: annotate, iter_lines_added_or_present_in_keys, keys
++
--- /dev/null
+# Copyright (C) 2006-2007 Jelmer Vernooij <jelmer@samba.org>
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""Tests for the bzr-svn plugin."""
+
+import os
+import sys
+import bzrlib
+
+from cStringIO import StringIO
+
+from bzrlib import osutils, urlutils
+from bzrlib.bzrdir import BzrDir
+from bzrlib.tests import TestCaseInTempDir
+from bzrlib.trace import mutter
+from bzrlib.workingtree import WorkingTree
+
+from bzrlib.plugins.svn import properties, ra, repos
+from bzrlib.plugins.svn.delta import send_stream
+from bzrlib.plugins.svn.client import Client
+from bzrlib.plugins.svn.ra import Auth, RemoteAccess
+
+class TestFileEditor(object):
+ def __init__(self, file):
+ self.file = file
+ self.is_closed = False
+
+ def change_prop(self, name, value):
+ self.file.change_prop(name, value)
+
+ def modify(self, contents=None):
+ if contents is None:
+ contents = osutils.rand_chars(100)
+ txdelta = self.file.apply_textdelta()
+ send_stream(StringIO(contents), txdelta)
+
+ def close(self):
+ assert not self.is_closed
+ self.is_closed = True
+ self.file.close()
+
+
+class TestDirEditor(object):
+ def __init__(self, dir, baseurl, revnum):
+ self.dir = dir
+ self.baseurl = baseurl
+ self.revnum = revnum
+ self.is_closed = False
+ self.children = []
+
+ def close_children(self):
+ for c in reversed(self.children):
+ if not c.is_closed:
+ c.close()
+
+ def close(self):
+ assert not self.is_closed
+ self.is_closed = True
+ self.close_children()
+ self.dir.close()
+
+ def change_prop(self, name, value):
+ self.dir.change_prop(name, value)
+
+ def open_dir(self, path):
+ self.close_children()
+ child = TestDirEditor(self.dir.open_directory(path, -1), self.baseurl, self.revnum)
+ self.children.append(child)
+ return child
+
+ def open_file(self, path):
+ self.close_children()
+ child = TestFileEditor(self.dir.open_file(path, -1))
+ self.children.append(child)
+ return child
+
+ def add_dir(self, path, copyfrom_path=None, copyfrom_rev=-1):
+ self.close_children()
+ if copyfrom_path is not None:
+ copyfrom_path = urlutils.join(self.baseurl, copyfrom_path)
+ if copyfrom_path is not None and copyfrom_rev == -1:
+ copyfrom_rev = self.revnum
+ assert (copyfrom_path is None and copyfrom_rev == -1) or \
+ (copyfrom_path is not None and copyfrom_rev > -1)
+ child = TestDirEditor(self.dir.add_directory(path, copyfrom_path, copyfrom_rev), self.baseurl, self.revnum)
+ self.children.append(child)
+ return child
+
+ def add_file(self, path, copyfrom_path=None, copyfrom_rev=-1):
+ self.close_children()
+ if copyfrom_path is not None:
+ copyfrom_path = urlutils.join(self.baseurl, copyfrom_path)
+ if copyfrom_path is not None and copyfrom_rev == -1:
+ copyfrom_rev = self.revnum
+ child = TestFileEditor(self.dir.add_file(path, copyfrom_path, copyfrom_rev))
+ self.children.append(child)
+ return child
+
+ def delete(self, path):
+ self.dir.delete_entry(path)
+
+
+class TestCommitEditor(TestDirEditor):
+ def __init__(self, editor, baseurl, revnum):
+ self.editor = editor
+ TestDirEditor.__init__(self, self.editor.open_root(), baseurl, revnum)
+
+ def close(self):
+ TestDirEditor.close(self)
+ self.editor.close()
+
+
+class TestCaseWithSubversionRepository(TestCaseInTempDir):
+ """A test case that provides the ability to build Subversion
+ repositories."""
+
+ def setUp(self):
+ super(TestCaseWithSubversionRepository, self).setUp()
+ self.client_ctx = Client()
+ self.client_ctx.auth = Auth([ra.get_simple_provider(),
+ ra.get_username_provider(),
+ ra.get_ssl_client_cert_file_provider(),
+ ra.get_ssl_client_cert_pw_file_provider(),
+ ra.get_ssl_server_trust_file_provider()])
+ self.client_ctx.log_msg_func = self.log_message_func
+ #self.client_ctx.notify_func = lambda err: mutter("Error: %s" % err)
+
+ def log_message_func(self, items):
+ return self.next_message
+
+ def make_repository(self, relpath, allow_revprop_changes=True):
+ """Create a repository.
+
+ :return: Handle to the repository.
+ """
+ abspath = os.path.join(self.test_dir, relpath)
+
+ repos.create(abspath)
+
+ if allow_revprop_changes:
+ if sys.platform == 'win32':
+ revprop_hook = os.path.join(abspath, "hooks", "pre-revprop-change.bat")
+ open(revprop_hook, 'w').write("exit 0\n")
+ else:
+ revprop_hook = os.path.join(abspath, "hooks", "pre-revprop-change")
+ open(revprop_hook, 'w').write("#!/bin/sh\n")
+ os.chmod(revprop_hook, os.stat(revprop_hook).st_mode | 0111)
+
+ return urlutils.local_path_to_url(abspath)
+
+ def make_local_bzrdir(self, repos_path, relpath):
+ """Create a repository and checkout."""
+
+ repos_url = self.make_repository(repos_path)
+
+ self.make_checkout(repos_url, relpath)
+
+ return BzrDir.open(relpath)
+
+ def make_checkout(self, repos_url, relpath):
+ self.client_ctx.checkout(repos_url, relpath, "HEAD")
+
+ def client_set_prop(self, path, name, value):
+ if value is None:
+ value = ""
+ self.client_ctx.propset(name, value, path, False, True)
+
+ def client_get_prop(self, path, name, revnum=None, recursive=False):
+ if revnum is None:
+ rev = "WORKING"
+ else:
+ rev = revnum
+ ret = self.client_ctx.propget(name, path, rev, rev, recursive)
+ if recursive:
+ return ret
+ else:
+ return ret.values()[0]
+
+ def client_get_revprop(self, url, revnum, name):
+ r = ra.RemoteAccess(url)
+ return r.rev_proplist(revnum)[name]
+
+ def client_set_revprop(self, url, revnum, name, value):
+ r = ra.RemoteAccess(url)
+ r.change_rev_prop(revnum, name, value)
+
+ def client_commit(self, dir, message=None, recursive=True):
+ """Commit current changes in specified working copy.
+
+ :param relpath: List of paths to commit.
+ """
+ olddir = os.path.abspath('.')
+ self.next_message = message
+ os.chdir(dir)
+ info = self.client_ctx.commit(["."], recursive, False)
+ os.chdir(olddir)
+ assert info is not None
+ return info
+
+ def client_add(self, relpath, recursive=True):
+ """Add specified files to working copy.
+
+ :param relpath: Path to the files to add.
+ """
+ self.client_ctx.add(relpath, recursive, False, False)
+
+ def client_log(self, url, start_revnum, stop_revnum):
+ r = ra.RemoteAccess(url)
+ assert isinstance(url, str)
+ ret = {}
+ def rcvr(orig_paths, rev, revprops, has_children):
+ ret[rev] = (orig_paths, revprops.get(properties.PROP_REVISION_AUTHOR), revprops.get(properties.PROP_REVISION_DATE), revprops.get(properties.PROP_REVISION_LOG))
+ r.get_log(rcvr, [""], start_revnum, stop_revnum, 0, True, True,
+ revprops=[properties.PROP_REVISION_AUTHOR, properties.PROP_REVISION_DATE, properties.PROP_REVISION_LOG])
+ return ret
+
+ def client_delete(self, relpath):
+ """Remove specified files from working copy.
+
+ :param relpath: Path to the files to remove.
+ """
+ self.client_ctx.delete([relpath], True)
+
+ def client_copy(self, oldpath, newpath, revnum=None):
+ """Copy file in working copy.
+
+ :param oldpath: Relative path to original file.
+ :param newpath: Relative path to new file.
+ """
+ if revnum is None:
+ rev = "HEAD"
+ else:
+ rev = revnum
+ self.client_ctx.copy(oldpath, newpath, rev)
+
+ def client_update(self, path):
+ self.client_ctx.update([path], "HEAD", True)
+
+ def build_tree(self, files):
+ """Create a directory tree.
+
+ :param files: Dictionary with filenames as keys, contents as
+ values. None as value indicates a directory.
+ """
+ for f in files:
+ if files[f] is None:
+ try:
+ os.makedirs(f)
+ except OSError:
+ pass
+ else:
+ try:
+ os.makedirs(os.path.dirname(f))
+ except OSError:
+ pass
+ open(f, 'w').write(files[f])
+
+ def make_client_and_bzrdir(self, repospath, clientpath):
+ repos_url = self.make_client(repospath, clientpath)
+
+ return BzrDir.open("svn+%s" % repos_url)
+
+ def make_client(self, repospath, clientpath, allow_revprop_changes=True):
+ """Create a repository and a checkout. Return the checkout.
+
+ :param relpath: Optional relpath to check out if not the full
+ repository.
+ :param clientpath: Path to checkout
+ :return: Repository URL.
+ """
+ repos_url = self.make_repository(repospath,
+ allow_revprop_changes=allow_revprop_changes)
+ self.make_checkout(repos_url, clientpath)
+ return repos_url
+
+ def open_fs(self, relpath):
+ """Open a fs.
+
+ :return: FS.
+ """
+ return repos.Repository(relpath).fs()
+
+ def get_commit_editor(self, url, message="Test commit"):
+ ra = RemoteAccess(url.encode("utf-8"))
+ revnum = ra.get_latest_revnum()
+ return TestCommitEditor(ra.get_commit_editor({"svn:log": message}), ra.url, revnum)
+
+
+def test_suite():
+ from unittest import TestSuite
+
+ from bzrlib.tests import TestUtil
+
+ loader = TestUtil.TestLoader()
+
+ suite = TestSuite()
+
+ testmod_names = [
+ 'test_branch',
+ 'test_branchprops',
+ 'test_changes',
+ 'test_checkout',
+ 'test_client',
+ 'test_commit',
+ 'test_config',
+ 'test_convert',
+ 'test_core',
+ 'test_delta',
+ 'test_errors',
+ 'test_fetch',
+ 'test_fileids',
+ 'test_log',
+ 'test_logwalker',
+ 'test_mapping',
+ 'test_properties',
+ 'test_push',
+ 'test_ra',
+ 'test_radir',
+ 'test_repos',
+ 'test_repository',
+ 'test_revids',
+ 'test_revspec',
+ 'test_scheme',
+ 'test_svk',
+ 'test_transport',
+ 'test_tree',
+ 'test_upgrade',
+ 'test_versionedfiles',
+ 'test_wc',
+ 'test_workingtree',
+ 'test_blackbox']
+ suite.addTest(loader.loadTestsFromModuleNames(["%s.%s" % (__name__, i) for i in testmod_names]))
++ suite.addTest(loader.loadTestsFromModuleNames(["bzrlib.plugins.svn.foreign.test_versionedfiles"]))
+
+ return suite
--- /dev/null
- from bzrlib.plugins.svn.versionedfiles import (SvnTexts, VirtualRevisionTexts,
- VirtualInventoryTexts, VirtualSignatureTexts)
+# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from bzrlib import osutils
+from bzrlib.tests import TestCase
+
- class BasicSvnTextsTests:
- def test_add_lines(self):
- self.assertRaises(NotImplementedError,
- self.texts.add_lines, "foo", [], [])
-
- def test_add_mpdiffs(self):
- self.assertRaises(NotImplementedError,
- self.texts.add_mpdiffs, [])
-
- def test_check(self):
- self.assertTrue(self.texts.check())
-
- def test_insert_record_stream(self):
- self.assertRaises(NotImplementedError, self.texts.insert_record_stream,
- [])
-
-
- class SvnTextsTests(TestCase, BasicSvnTextsTests):
++from bzrlib.plugins.svn.foreign.test_versionedfiles import BasicTextsTests
++from bzrlib.plugins.svn.versionedfiles import SvnTexts
+
+
-
- class VirtualRevisionTextsTests(TestCase, BasicSvnTextsTests):
- def _make_parents_provider(self):
- return self
-
- def setUp(self):
- self.texts = VirtualRevisionTexts(self)
-
- def get_parent_map(self, keys):
- raise NotImplementedError
-
-
- class VirtualInventoryTextsTests(TestCase, BasicSvnTextsTests):
- def _make_parents_provider(self):
- return self
-
- def get_inventory_xml(self, key):
- return "FOO"
-
- def get_parent_map(self, keys):
- return {("A",): (("B",))}
-
- def setUp(self):
- self.texts = VirtualInventoryTexts(self)
-
- def test_get_sha1s(self):
- self.assertEquals({("A",): osutils.sha_strings(["FOO"])}, self.texts.get_sha1s([("A",)]))
-
-
- class VirtualSignatureTextsTests(TestCase, BasicSvnTextsTests):
- def _make_parents_provider(self):
- return self
-
- def setUp(self):
- self.texts = VirtualSignatureTexts(self)
-
- def get_parent_map(self, keys):
- raise NotImplementedError
-
++class SvnTextsTests(TestCase, BasicTextsTests):
+ def setUp(self):
+ self.texts = SvnTexts(self)
+
from bzrlib import osutils, urlutils
from bzrlib.versionedfile import FulltextContentFactory, VersionedFiles, VirtualVersionedFiles
-from cStringIO import StringIO
-
-
-class VirtualRevisionTexts(VirtualVersionedFiles):
- """Virtual revisions backend."""
- def __init__(self, repository):
- self.repository = repository
- super(VirtualRevisionTexts, self).__init__(self.repository._make_parents_provider().get_parent_map, self.get_lines)
-
- def get_lines(self, key):
- return osutils.split_lines(self.repository.get_revision_xml(key))
+from bzrlib.plugins.svn.core import SubversionException
+from bzrlib.plugins.svn.errors import ERR_FS_NOT_FILE
++from bzrlib.plugins.svn.foreign.versionedfiles import VirtualSignatureTexts, VirtualRevisionTexts, VirtualInventoryTexts
- # TODO: annotate, iter_lines_added_or_present_in_keys, keys
+from cStringIO import StringIO
+class SvnTexts(VersionedFiles):
+ """Subversion texts backend."""
-class VirtualInventoryTexts(VirtualVersionedFiles):
- """Virtual inventories backend."""
def __init__(self, repository):
self.repository = repository
- super(VirtualInventoryTexts, self).__init__(self.repository._make_parents_provider().get_parent_map, self.get_lines)
-
- def get_lines(self, key):
- return osutils.split_lines(self.repository.get_inventory_xml(key))
-
- # TODO: annotate, iter_lines_added_or_present_in_keys, keys
+ def check(self, progressbar=None):
+ return True
+
+ def add_mpdiffs(self, records):
+ raise NotImplementedError(self.add_mpdiffs)
+
+ def get_record_stream(self, keys, ordering, include_delta_closure):
+ # TODO: there may be valid text revisions that only exist as
+ # ghosts in the repository itself. This function will
+ # not be able to report them.
+ # TODO: Sort keys by file id and issue just one get_file_revs() call
+ # per file-id ?
+ for (fileid, revid) in list(keys):
+ (branch, revnum, mapping) = self.repository.lookup_revision_id(revid)
+ map = self.repository.get_fileid_map(revnum, branch, mapping)
+ # Unfortunately, the map is the other way around
+ lines = None
+ for k, (v, ck) in map.items():
+ if v == fileid:
+ try:
+ stream = StringIO()
+ self.repository.transport.get_file(urlutils.join(branch, k), stream, revnum)
+ stream.seek(0)
+ lines = stream.readlines()
+ except SubversionException, (_, num):
+ if num == ERR_FS_NOT_FILE:
+ lines = []
+ else:
+ raise
+ break
+ if lines is None:
+ raise Exception("Inconsistent key specified: (%r,%r)" % (fileid, revid))
+ yield FulltextContentFactory((fileid, revid), None,
+ sha1=osutils.sha_strings(lines),
+ text=''.join(lines))
+
+ def get_parent_map(self, keys):
+ invs = {}
+
+ # First, figure out the revision number/path
+ ret = {}
+ for (fileid, revid) in keys:
+ # FIXME: Evil hack
+ ret[(fileid, revid)] = None
+ return ret
+
+ # TODO: annotate, get_sha1s, iter_lines_added_or_present_in_keys, keys
-class VirtualSignatureTexts(VirtualVersionedFiles):
- """Virtual signatures backend."""
- def __init__(self, repository):
- self.repository = repository
- super(VirtualSignatureTexts, self).__init__(self.repository._make_parents_provider().get_parent_map, self.get_lines)
-
- def get_lines(self, key):
- return osutils.split_lines(self.repository.get_signature_text(key))
- class VirtualRevisionTexts(VirtualVersionedFiles):
- """Virtual revisions backend."""
- def __init__(self, repository):
- self.repository = repository
- super(VirtualRevisionTexts, self).__init__(self.repository._make_parents_provider().get_parent_map, self.get_lines)
-
- def get_lines(self, key):
- return osutils.split_lines(self.repository.get_revision_xml(key))
-
- # TODO: annotate, iter_lines_added_or_present_in_keys, keys
-
-
- class VirtualInventoryTexts(VirtualVersionedFiles):
- """Virtual inventories backend."""
- def __init__(self, repository):
- self.repository = repository
- super(VirtualInventoryTexts, self).__init__(self.repository._make_parents_provider().get_parent_map, self.get_lines)
-
- def get_lines(self, key):
- return osutils.split_lines(self.repository.get_inventory_xml(key))
-
- # TODO: annotate, iter_lines_added_or_present_in_keys, keys
-
-
- class VirtualSignatureTexts(VirtualVersionedFiles):
- """Virtual signatures backend."""
- def __init__(self, repository):
- self.repository = repository
- super(VirtualSignatureTexts, self).__init__(self.repository._make_parents_provider().get_parent_map, self.get_lines)
-
- def get_lines(self, key):
- return osutils.split_lines(self.repository.get_signature_text(key))
-
-- # TODO: annotate, iter_lines_added_or_present_in_keys, keys