--- /dev/null
- from bzrlib.errors import BzrError, InvalidRevisionId
+# Copyright (C) 2006,2008 by Jelmer Vernooij
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""Upgrading revisions made with older versions of the mapping."""
+
+from bzrlib import ui
- from bzrlib.plugins.svn.mapping import mapping_registry
-
++from bzrlib.errors import BzrError, InvalidRevisionId, DependencyNotPresent
+from bzrlib.revision import Revision
+from bzrlib.trace import info
+
+import itertools
+
+class RebaseNotPresent(DependencyNotPresent):
+ _fmt = "Unable to import bzr-rebase (required for upgrade support): %(error)s"
+
+ def __init__(self, error):
+ DependencyNotPresent.__init__(self, 'bzr-rebase', error)
+
+
+def check_rebase_version(min_version):
+ """Check what version of bzr-rebase is installed.
+
+ Raises an exception when the version installed is older than
+ min_version.
+
+ :raises RebaseNotPresent: Raised if bzr-rebase is not installed or too old.
+ """
+ try:
+ from bzrlib.plugins.rebase import version_info as rebase_version_info
+ if rebase_version_info[:2] < min_version:
+ raise RebaseNotPresent("Version %r present, at least %r required"
+ % (rebase_version_info, min_version))
+ except ImportError, e:
+ raise RebaseNotPresent(e)
+
+
+
+class UpgradeChangesContent(BzrError):
+ """Inconsistency was found upgrading the mapping of a revision."""
+ _fmt = """Upgrade will change contents in revision %(revid)s. Use --allow-changes to override."""
+
+ def __init__(self, revid):
+ self.revid = revid
+
+
+
+def create_upgraded_revid(revid, mapping_suffix, upgrade_suffix="-upgrade"):
+ """Create a new revision id for an upgraded version of a revision.
+
+ Prevents suffix to be appended needlessly.
+
+ :param revid: Original revision id.
+ :return: New revision id
+ """
+ if revid.endswith(upgrade_suffix):
+ return revid[0:revid.rfind("-svn")] + mapping_suffix + upgrade_suffix
+ else:
+ return revid + mapping_suffix + upgrade_suffix
+
+
+def determine_fileid_renames(old_tree, new_tree):
+ for old_file_id in old_tree:
+ new_file_id = new_tree.path2id(old_tree.id2path(old_file_id))
+ if new_file_id is not None:
+ yield old_file_id, new_file_id
+
+
+def upgrade_workingtree(wt, foreign_repository, new_mapping, mapping_registry,
+ allow_changes=False, verbose=False):
+ """Upgrade a working tree.
+
+ :param foreign_repository: Foreign repository object
+ """
+ wt.lock_write()
+ try:
+ old_revid = wt.last_revision()
+ renames = upgrade_branch(wt.branch, foreign_repository, new_mapping=new_mapping,
+ mapping_registry=mapping_registry,
+ allow_changes=allow_changes, verbose=verbose)
+ last_revid = wt.branch.last_revision()
+ wt.set_last_revision(last_revid)
+
+ # Adjust file ids in working tree
+ for (old_fileid, new_fileid) in determine_fileid_renames(wt.branch.repository.revision_tree(old_revid), wt.basis_tree()):
+ path = wt.id2path(old_fileid)
+ wt.remove(path)
+ wt.add([path], [new_fileid])
+ finally:
+ wt.unlock()
+
+ return renames
+
+
+def upgrade_branch(branch, foreign_repository, new_mapping=None,
+ allow_changes=False, verbose=False):
+ """Upgrade a branch to the current mapping version.
+
+ :param branch: Branch to upgrade.
+ :param foreign_repository: Repository to fetch new revisions from
+ :param allow_changes: Allow changes in mappings.
+ :param verbose: Whether to print verbose list of rewrites
+ """
+ revid = branch.last_revision()
+ renames = upgrade_repository(branch.repository, foreign_repository,
+ revision_id=revid, new_mapping=new_mapping,
+ allow_changes=allow_changes, verbose=verbose)
+ if len(renames) > 0:
+ branch.generate_revision_history(renames[revid])
+ return renames
+
+
+def check_revision_changed(oldrev, newrev):
+ """Check if two revisions are different. This is exactly the same
+ as Revision.equals() except that it does not check the revision_id."""
+ if (newrev.inventory_sha1 != oldrev.inventory_sha1 or
+ newrev.timestamp != oldrev.timestamp or
+ newrev.message != oldrev.message or
+ newrev.timezone != oldrev.timezone or
+ newrev.committer != oldrev.committer or
+ newrev.properties != oldrev.properties):
+ raise UpgradeChangesContent(oldrev.revision_id)
+
+
+def generate_upgrade_map(new_mapping, revs, mapping_registry):
+ """Generate an upgrade map for use by bzr-rebase.
+
+ :param new_mapping: Mapping to upgrade revisions to.
+ :param revs: Iterator over revisions to upgrade.
+ :return: Map from old revids as keys, new revids as values stored in a
+ dictionary.
+ """
+ rename_map = {}
+ # Create a list of revisions that can be renamed during the upgade
+ for revid in revs:
+ assert isinstance(revid, str)
+ try:
+ (foreign_revid, mapping) = mapping_registry.parse_revision_id(revid)
+ except InvalidRevisionId:
+ # Not a foreign revision, nothing to do
+ continue
+ newrevid = new_mapping.revision_id_foreign_to_bzr(foreign_revid)
+ if revid == newrevid:
+ continue
+ rename_map[revid] = newrevid
+
+ return rename_map
+
+MIN_REBASE_VERSION = (0, 4)
+
+def create_upgrade_plan(repository, foreign_repository, new_mapping,
+ mapping_registry, revision_id=None, allow_changes=False):
+ """Generate a rebase plan for upgrading revisions.
+
+ :param repository: Repository to do upgrade in
+ :param foreign_repository: Subversion repository to fetch new revisions from.
+ :param new_mapping: New mapping to use.
+ :param revision_id: Revision to upgrade (None for all revisions in
+ repository.)
+ :param allow_changes: Whether an upgrade is allowed to change the contents
+ of revisions.
+ :return: Tuple with a rebase plan and map of renamed revisions.
+ """
+ from bzrlib.plugins.rebase.rebase import generate_transpose_plan
+ check_rebase_version(MIN_REBASE_VERSION)
+
+ graph = repository.get_graph()
+ if revision_id is None:
+ potential = repository.all_revision_ids()
+ else:
+ potential = itertools.imap(lambda (rev, parents): rev,
+ graph.iter_ancestry([revision_id]))
+ upgrade_map = generate_upgrade_map(new_mapping, potential, mapping_registry)
+
+ # Make sure all the required current version revisions are present
+ for revid in upgrade_map.values():
+ if not repository.has_revision(revid):
+ repository.fetch(foreign_repository, revid)
+
+ if not allow_changes:
+ for oldrevid, newrevid in upgrade_map.items():
+ oldrev = repository.get_revision(oldrevid)
+ newrev = repository.get_revision(newrevid)
+ check_revision_changed(oldrev, newrev)
+
+ if revision_id is None:
+ heads = repository.all_revision_ids()
+ else:
+ heads = [revision_id]
+
+ plan = generate_transpose_plan(graph.iter_ancestry(heads), upgrade_map,
+ graph,
+ lambda revid: create_upgraded_revid(revid, new_mapping.upgrade_suffix))
+ def remove_parents((oldrevid, (newrevid, parents))):
+ return (oldrevid, newrevid)
+ upgrade_map.update(dict(map(remove_parents, plan.items())))
+
+ return (plan, upgrade_map)
+
+
+def upgrade_repository(repository, foreign_repository, new_mapping,
+ mapping_registry, revision_id=None, allow_changes=False,
+ verbose=False):
+ """Upgrade the revisions in repository until the specified stop revision.
+
+ :param repository: Repository in which to upgrade.
+ :param foreign_repository: Repository to fetch new revisions from.
+ :param new_mapping: New mapping.
+ :param revision_id: Revision id up until which to upgrade, or None for
+ all revisions.
+ :param allow_changes: Allow changes to mappings.
+ :param verbose: Whether to print list of rewrites
+ :return: Dictionary of mapped revisions
+ """
+ check_rebase_version(MIN_REBASE_VERSION)
+ from bzrlib.plugins.rebase.rebase import (
+ replay_snapshot, rebase, rebase_todo)
+
+ # Find revisions that need to be upgraded, create
+ # dictionary with revision ids in key, new parents in value
+ try:
+ repository.lock_write()
+ foreign_repository.lock_read()
+ (plan, revid_renames) = create_upgrade_plan(repository, foreign_repository,
+ new_mapping, mapping_registry,
+ revision_id=revision_id,
+ allow_changes=allow_changes)
+ if verbose:
+ for revid in rebase_todo(repository, plan):
+ info("%s -> %s" % (revid, plan[revid][0]))
+ def fix_revid(revid):
+ try:
+ (foreign_revid, mapping) = mapping_registry.parse_revision_id(revid)
+ except InvalidRevisionId:
+ return revid
+ return new_mapping.revision_id_foreign_to_bzr(foreign_revid)
+ def replay(repository, oldrevid, newrevid, new_parents):
+ return replay_snapshot(repository, oldrevid, newrevid, new_parents,
+ revid_renames, fix_revid)
+ rebase(repository, plan, replay)
+ return revid_renames
+ finally:
+ repository.unlock()
+ foreign_repository.unlock()
+
--- /dev/null
+# Copyright (C) 2005-2008 Jelmer Vernooij <jelmer@samba.org>
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from bzrlib.branch import Branch
+from bzrlib.bzrdir import BzrDir
+from bzrlib.errors import NoSuchRevision
+from bzrlib.repository import Repository
+from bzrlib.tests import TestCase
+from bzrlib.trace import mutter
+
+from bzrlib.plugins.svn import format
+from bzrlib.plugins.svn.layout import TrunkLayout, RootLayout
+from bzrlib.plugins.svn.mapping import SVN_PROP_BZR_REVISION_ID, mapping_registry
+from bzrlib.plugins.svn.mapping3 import BzrSvnMappingv3FileProps, SVN_PROP_BZR_BRANCHING_SCHEME, set_property_scheme
+from bzrlib.plugins.svn.mapping3.scheme import NoBranchingScheme, ListBranchingScheme
+from bzrlib.plugins.svn.tests import SubversionTestCase
+from bzrlib.plugins.svn.tests.test_mapping import sha1
+
+class Mappingv3FilePropTests(TestCase):
+ def setUp(self):
+ self.mapping = BzrSvnMappingv3FileProps(NoBranchingScheme())
+
+ def test_generate_revid(self):
+ self.assertEqual("svn-v3-undefined:myuuid:branch:5",
+ BzrSvnMappingv3FileProps._generate_revision_id("myuuid", 5, "branch", "undefined"))
+
+ def test_generate_revid_nested(self):
+ self.assertEqual("svn-v3-undefined:myuuid:branch%2Fpath:5",
+ BzrSvnMappingv3FileProps._generate_revision_id("myuuid", 5, "branch/path", "undefined"))
+
+ def test_generate_revid_special_char(self):
+ self.assertEqual("svn-v3-undefined:myuuid:branch%2C:5",
+ BzrSvnMappingv3FileProps._generate_revision_id("myuuid", 5, "branch\x2c", "undefined"))
+
+ def test_generate_revid_nordic(self):
+ self.assertEqual("svn-v3-undefined:myuuid:branch%C3%A6:5",
+ BzrSvnMappingv3FileProps._generate_revision_id("myuuid", 5, u"branch\xe6".encode("utf-8"), "undefined"))
+
+ def test_parse_revid_simple(self):
+ self.assertEqual(("uuid", "", 4, "undefined"),
+ BzrSvnMappingv3FileProps._parse_revision_id(
+ "svn-v3-undefined:uuid::4"))
+
+ def test_parse_revid_nested(self):
+ self.assertEqual(("uuid", "bp/data", 4, "undefined"),
+ BzrSvnMappingv3FileProps._parse_revision_id(
+ "svn-v3-undefined:uuid:bp%2Fdata:4"))
+
+ def test_generate_file_id_root(self):
+ self.assertEqual("2@uuid:bp:", self.mapping.generate_file_id("uuid", 2, "bp", u""))
+
+ def test_generate_file_id_path(self):
+ self.assertEqual("2@uuid:bp:mypath",
+ self.mapping.generate_file_id("uuid", 2, "bp", u"mypath"))
+
+ def test_generate_file_id_long(self):
+ dir = "this/is/a" + ("/very"*40) + "/long/path/"
+ self.assertEqual("2@uuid:bp;" + sha1(dir+"filename"),
+ self.mapping.generate_file_id("uuid", 2, "bp", dir+u"filename"))
+
+ def test_generate_file_id_long_nordic(self):
+ dir = "this/is/a" + ("/very"*40) + "/long/path/"
+ self.assertEqual("2@uuid:bp;" + sha1((dir+u"filename\x2c\x8a").encode('utf-8')),
+ self.mapping.generate_file_id("uuid", 2, "bp", dir+u"filename\x2c\x8a"))
+
+ def test_generate_file_id_special_char(self):
+ self.assertEqual("2@uuid:bp:mypath%2C%C2%8A",
+ self.mapping.generate_file_id("uuid", 2, "bp", u"mypath\x2c\x8a"))
+
+ def test_generate_file_id_spaces(self):
+ self.assertFalse(" " in self.mapping.generate_file_id("uuid", 1, "b p", u"my path"))
+
+ def test_generate_svn_file_id(self):
+ self.assertEqual("2@uuid:bp:path",
+ self.mapping.generate_file_id("uuid", 2, "bp", u"path"))
+
+ def test_generate_svn_file_id_nordic(self):
+ self.assertEqual("2@uuid:bp:%C3%A6%C3%B8%C3%A5",
+ self.mapping.generate_file_id("uuid", 2, "bp", u"\xe6\xf8\xe5"))
+
+ def test_generate_svn_file_id_nordic_branch(self):
+ self.assertEqual("2@uuid:%C3%A6:%C3%A6%C3%B8%C3%A5",
+ self.mapping.generate_file_id("uuid", 2, u"\xe6".encode('utf-8'), u"\xe6\xf8\xe5"))
+
+
+class RepositoryTests(SubversionTestCase):
+
+ def setUp(self):
+ super(RepositoryTests, self).setUp()
+ self.repos_url = self.make_repository("d")
+ self._old_mapping = mapping_registry._get_default_key()
+ mapping_registry.set_default("v3")
+
+ def tearDown(self):
+ super(RepositoryTests, self).tearDown()
+ mapping_registry.set_default("v3")
+
+ def test_generate_revision_id_forced_revid(self):
+ dc = self.get_commit_editor(self.repos_url)
+ dc.change_prop(SVN_PROP_BZR_REVISION_ID+"v3-none",
+ "2 someid\n")
+ dc.close()
+
+ repos = Repository.open(self.repos_url)
+ mapping = repos.get_mapping()
+ if not mapping.roundtripping:
+ raise TestNotApplicable()
+ revid = repos.generate_revision_id(1, "", mapping)
+ self.assertEquals("someid", revid)
+
+ def test_generate_revision_id_forced_revid_invalid(self):
+
+ dc = self.get_commit_editor(self.repos_url)
+ dc.change_prop(SVN_PROP_BZR_REVISION_ID+"v3-none", "corrupt-id\n")
+ dc.close()
+
+ repos = Repository.open(self.repos_url)
+ mapping = repos.get_mapping()
+ revid = repos.generate_revision_id(1, "", mapping)
+ self.assertEquals(
+ mapping.revision_id_foreign_to_bzr((repos.uuid, 1, "")),
+ revid)
+
+ def test_revision_ghost_parents(self):
+ dc = self.get_commit_editor(self.repos_url)
+ dc.add_file("foo").modify("data")
+ dc.close()
+
+ dc = self.get_commit_editor(self.repos_url)
+ dc.open_file("foo").modify("data2")
+ dc.change_prop("bzr:ancestry:v3-none", "ghostparent\n")
+ dc.close()
+
+ repository = Repository.open(self.repos_url)
+ mapping = repository.get_mapping()
+ self.assertEqual((),
+ repository.get_revision(
+ repository.generate_revision_id(0, "", mapping)).parent_ids)
+ self.assertEqual((repository.generate_revision_id(0, "", mapping),),
+ repository.get_revision(
+ repository.generate_revision_id(1, "", mapping)).parent_ids)
+ self.assertEqual((repository.generate_revision_id(1, "", mapping),
+ "ghostparent"),
+ repository.get_revision(
+ repository.generate_revision_id(2, "", mapping)).parent_ids)
+
+ def test_get_revision_id_overriden(self):
+ self.make_checkout(self.repos_url, 'dc')
+ repository = Repository.open(self.repos_url)
+ self.assertRaises(NoSuchRevision, repository.get_revision, "nonexisting")
+ self.build_tree({'dc/foo': "data"})
+ self.client_add("dc/foo")
+ self.client_commit("dc", "My Message")
+ self.build_tree({'dc/foo': "data2"})
+ self.client_set_prop("dc", "bzr:revision-id:v3-none",
+ "3 myrevid\n")
+ self.client_update("dc")
+ (num, date, author) = self.client_commit("dc", "Second Message")
+ repository = Repository.open(self.repos_url)
+ mapping = repository.get_mapping()
+ if not mapping.roundtripping:
+ raise TestNotApplicable
+ revid = mapping.revision_id_foreign_to_bzr((repository.uuid, 2, ""))
+ rev = repository.get_revision("myrevid")
+ self.assertEqual((repository.generate_revision_id(1, "", mapping),),
+ rev.parent_ids)
+ self.assertEqual(rev.revision_id,
+ repository.generate_revision_id(2, "", mapping))
+ self.assertEqual(author, rev.committer)
+ self.assertIsInstance(rev.properties, dict)
+
+ def test_get_ancestry_merged(self):
+ self.make_checkout(self.repos_url, 'dc')
+ self.build_tree({'dc/foo': "data"})
+ self.client_add("dc/foo")
+ self.client_commit("dc", "My Message")
+ self.client_update("dc")
+ self.client_set_prop("dc", "bzr:ancestry:v3-none", "a-parent\n")
+ self.build_tree({'dc/foo': "data2"})
+ self.client_commit("dc", "Second Message")
+ repository = Repository.open(self.repos_url)
+ mapping = repository.get_mapping()
+ self.assertEqual([None, repository.generate_revision_id(0, "", mapping)],
+ repository.get_ancestry(
+ repository.generate_revision_id(0, "", mapping)))
+ self.assertEqual([None, repository.generate_revision_id(0, "", mapping),
+ repository.generate_revision_id(1, "", mapping)],
+ repository.get_ancestry(
+ repository.generate_revision_id(1, "", mapping)))
+ self.assertEqual([None,
+ repository.generate_revision_id(0, "", mapping), "a-parent",
+ repository.generate_revision_id(1, "", mapping),
+ repository.generate_revision_id(2, "", mapping)],
+ repository.get_ancestry(
+ repository.generate_revision_id(2, "", mapping)))
+
+ def test_lookup_revision_id_overridden(self):
+ dc = self.get_commit_editor(self.repos_url)
+ dc.add_dir("bloe")
+ dc.change_prop(SVN_PROP_BZR_REVISION_ID+"v3-none", "2 myid\n")
+ dc.close()
+ repository = Repository.open(self.repos_url)
+ mapping = repository.get_mapping()
+ self.assertEqual(("", 1), repository.lookup_revision_id(
+ mapping.revision_id_foreign_to_bzr((repository.uuid, 1, "")))[:2])
+ self.assertEqual(("", 1),
+ repository.lookup_revision_id("myid")[:2])
+
+ def test_lookup_revision_id_overridden_invalid(self):
+ dc = self.get_commit_editor(self.repos_url)
+ dc.add_dir("bloe")
+ dc.change_prop(SVN_PROP_BZR_REVISION_ID+"v3-none", "corrupt-entry\n")
+ dc.close()
+
+ repository = Repository.open(self.repos_url)
+ mapping = repository.get_mapping()
+ self.assertEqual(("", 1), repository.lookup_revision_id(
+ mapping.revision_id_foreign_to_bzr((repository.uuid, 1, "")))[:2])
+ self.assertRaises(NoSuchRevision, repository.lookup_revision_id,
+ "corrupt-entry")
+
+ def test_lookup_revision_id_overridden_invalid_dup(self):
+ self.make_checkout(self.repos_url, 'dc')
+ self.build_tree({'dc/bloe': None})
+ self.client_add("dc/bloe")
+ self.client_set_prop("dc", SVN_PROP_BZR_REVISION_ID+"v3-none",
+ "corrupt-entry\n")
+ self.client_commit("dc", "foobar")
+ self.build_tree({'dc/bla': None})
+ self.client_add("dc/bla")
+ self.client_set_prop("dc", SVN_PROP_BZR_REVISION_ID+"v3-none",
+ "corrupt-entry\n2 corrupt-entry\n")
+ self.client_commit("dc", "foobar")
+ repository = Repository.open(self.repos_url)
+ mapping = repository.get_mapping()
+ self.assertEqual(("", 2), repository.lookup_revision_id(
+ mapping.revision_id_foreign_to_bzr((repository.uuid, 2, "")))[:2])
+ self.assertEqual(("", 1), repository.lookup_revision_id(
+ mapping.revision_id_foreign_to_bzr((repository.uuid, 1, "")))[:2])
+ self.assertEqual(("", 2), repository.lookup_revision_id(
+ "corrupt-entry")[:2])
+
+ def test_lookup_revision_id_overridden_not_found(self):
+ """Make sure a revision id that is looked up but doesn't exist
+ doesn't accidently end up in the revid cache."""
+ self.make_checkout(self.repos_url, 'dc')
+ self.build_tree({'dc/bloe': None})
+ self.client_add("dc/bloe")
+ self.client_set_prop("dc", SVN_PROP_BZR_REVISION_ID+"v3-none", "2 myid\n")
+ self.client_commit("dc", "foobar")
+ repository = Repository.open(self.repos_url)
+ self.assertRaises(NoSuchRevision,
+ repository.lookup_revision_id, "foobar")
+
+ def test_set_branching_scheme_property(self):
+ self.make_checkout(self.repos_url, 'dc')
+ self.client_set_prop("dc", SVN_PROP_BZR_BRANCHING_SCHEME,
+ "trunk\nbranches/*\nbranches/tmp/*")
+ self.client_commit("dc", "set scheme")
+ repository = Repository.open(self.repos_url)
+ self.assertEquals(ListBranchingScheme(["trunk", "branches/*", "branches/tmp/*"]).branch_list,
+ repository.get_mapping().scheme.branch_list)
+
+ def test_set_property_scheme(self):
+ self.make_checkout(self.repos_url, 'dc')
+ repos = Repository.open(self.repos_url)
+ set_property_scheme(repos, ListBranchingScheme(["bla/*"]))
+ self.client_update("dc")
+ self.assertEquals("bla/*\n",
+ self.client_get_prop("dc", SVN_PROP_BZR_BRANCHING_SCHEME))
+ self.assertEquals("Updating branching scheme for Bazaar.",
+ self.client_log(self.repos_url, 1, 1)[1][3])
+
+ def test_fetch_fileid_renames(self):
+ dc = self.get_commit_editor(self.repos_url)
+ dc.add_file("test").modify("data")
+ dc.change_prop("bzr:file-ids", "test\tbla\n")
+ dc.change_prop("bzr:revision-info", "")
+ dc.close()
+
+ oldrepos = Repository.open(self.repos_url)
+ dir = BzrDir.create("f", format.get_rich_root_format())
+ newrepos = dir.create_repository()
+ oldrepos.copy_content_into(newrepos)
+ mapping = oldrepos.get_mapping()
+ self.assertEqual("bla", newrepos.get_inventory(
+ oldrepos.generate_revision_id(1, "", mapping)).path2id("test"))
+
+ def test_fetch_ghosts(self):
+ dc = self.get_commit_editor(self.repos_url)
+ dc.add_file("bla").modify("data")
+ dc.change_prop("bzr:ancestry:v3-none", "aghost\n")
+ dc.close()
+
+ oldrepos = Repository.open(self.repos_url)
+ oldrepos.set_layout(RootLayout())
+ dir = BzrDir.create("f", format.get_rich_root_format())
+ newrepos = dir.create_repository()
+ oldrepos.copy_content_into(newrepos)
+ mapping = oldrepos.get_mapping()
+
+ rev = newrepos.get_revision(oldrepos.generate_revision_id(1, "", mapping))
+ self.assertTrue("aghost" in rev.parent_ids)
+
+ def test_fetch_invalid_ghosts(self):
+ dc = self.get_commit_editor(self.repos_url)
+ dc.add_file("bla").modify("data")
+ dc.change_prop("bzr:ancestry:v3-none", "a ghost\n")
+ dc.close()
+
+ oldrepos = Repository.open(self.repos_url)
+ dir = BzrDir.create("f", format.get_rich_root_format())
+ newrepos = dir.create_repository()
+ oldrepos.copy_content_into(newrepos)
+
+ mapping = oldrepos.get_mapping()
+
+ rev = newrepos.get_revision(oldrepos.generate_revision_id(1, "", mapping))
+ self.assertEqual([oldrepos.generate_revision_id(0, "", mapping)], rev.parent_ids)
+
+ def test_fetch_complex_ids_dirs(self):
+ dc = self.get_commit_editor(self.repos_url)
+ dir = dc.add_dir("dir")
+ dir.add_dir("dir/adir")
+ dc.change_prop("bzr:revision-info", "")
+ dc.change_prop("bzr:file-ids", "dir\tbloe\ndir/adir\tbla\n")
+ dc.close()
+
+ dc = self.get_commit_editor(self.repos_url)
+ dc.add_dir("bdir", "dir/adir")
+ dir = dc.open_dir("dir")
+ dir.delete("dir/adir")
+ dc.change_prop("bzr:revision-info", "properties: \n")
+ dc.change_prop("bzr:file-ids", "bdir\tbla\n")
+ dc.close()
+
+ oldrepos = Repository.open(self.repos_url)
+ dir = BzrDir.create("f", format.get_rich_root_format())
+ newrepos = dir.create_repository()
+ oldrepos.copy_content_into(newrepos)
+ mapping = oldrepos.get_mapping()
+ tree = newrepos.revision_tree(oldrepos.generate_revision_id(2, "", mapping))
+ self.assertEquals("bloe", tree.path2id("dir"))
+ self.assertIs(None, tree.path2id("dir/adir"))
+ self.assertEquals("bla", tree.path2id("bdir"))
+
+ def test_fetch_complex_ids_files(self):
+ dc = self.get_commit_editor(self.repos_url)
+ dir = dc.add_dir("dir")
+ dir.add_file("dir/adir").modify("contents")
+ dc.change_prop("bzr:revision-info", "")
+ dc.change_prop("bzr:file-ids", "dir\tbloe\ndir/adir\tbla\n")
+ dc.close()
+
+ dc = self.get_commit_editor(self.repos_url)
+ dc.add_file("bdir", "dir/adir")
+ dir = dc.open_dir("dir")
+ dir.delete("dir/adir")
+ dc.change_prop("bzr:revision-info", "properties: \n")
+ dc.change_prop("bzr:file-ids", "bdir\tbla\n")
+ dc.close()
+
+ oldrepos = Repository.open(self.repos_url)
+ dir = BzrDir.create("f", format.get_rich_root_format())
+ newrepos = dir.create_repository()
+ oldrepos.copy_content_into(newrepos)
+ mapping = oldrepos.get_mapping()
+ tree = newrepos.revision_tree(oldrepos.generate_revision_id(2, "", mapping))
+ self.assertEquals("bloe", tree.path2id("dir"))
+ self.assertIs(None, tree.path2id("dir/adir"))
+ mutter('entries: %r' % tree.inventory.entries())
+ self.assertEquals("bla", tree.path2id("bdir"))
+
+ def test_store_branching_scheme(self):
+ self.make_checkout(self.repos_url, 'dc')
+ repository = Repository.open(self.repos_url)
+ repository.set_layout(TrunkLayout(42))
+ repository = Repository.open(self.repos_url)
+ self.assertEquals("trunk42", str(repository.get_mapping().scheme))
+
+ def test_revision_fileidmap(self):
+ dc = self.get_commit_editor(self.repos_url)
+ dc.add_file("foo").modify("data")
+ dc.change_prop("bzr:revision-info", "")
+ dc.change_prop("bzr:file-ids", "foo\tsomeid\n")
+ dc.close()
+
+ repository = Repository.open(self.repos_url)
+ repository.set_layout(RootLayout())
+ tree = repository.revision_tree(Branch.open(self.repos_url).last_revision())
+ self.assertEqual("someid", tree.inventory.path2id("foo"))
+ self.assertFalse("1@%s::foo" % repository.uuid in tree.inventory)
+
+ def test_commit_revision_id(self):
+ self.make_checkout(self.repos_url, "dc")
+ wt = WorkingTree.open("dc")
+ self.build_tree({'dc/foo/bla': "data", 'dc/bla': "otherdata"})
+ wt.add('bla')
+ wt.commit(message="data")
+
+ branch = Branch.open(self.repos_url)
+ builder = branch.get_commit_builder([branch.last_revision()],
+ revision_id="my-revision-id")
+ tree = branch.repository.revision_tree(branch.last_revision())
+ new_tree = copy(tree)
+ ie = new_tree.inventory.root
+ ie.revision = None
+ builder.record_entry_contents(ie, [tree.inventory], '', new_tree,
+ None)
+ builder.finish_inventory()
+ builder.commit("foo")
+
+ self.assertEqual("3 my-revision-id\n",
+ self.client_get_prop("dc",
+ "bzr:revision-id:v3-none", 2))
+
+ def test_commit_metadata(self):
+ self.make_checkout(self.repos_url, "dc")
+
+ wt = WorkingTree.open("dc")
+ self.build_tree({'dc/foo/bla': "data", 'dc/bla': "otherdata"})
+ wt.add('bla')
+ wt.commit(message="data")
+
+ branch = Branch.open(self.repos_url)
+ builder = branch.get_commit_builder([branch.last_revision()],
+ timestamp=4534.0, timezone=2, committer="fry",
+ revision_id="my-revision-id")
+ tree = branch.repository.revision_tree(branch.last_revision())
+ new_tree = copy(tree)
+ ie = new_tree.inventory.root
+ ie.revision = None
+ builder.record_entry_contents(ie, [tree.inventory], '', new_tree, None)
+ builder.finish_inventory()
+ builder.commit("foo")
+
+ self.assertEqual("3 my-revision-id\n",
+ self.client_get_prop("dc", "bzr:revision-id:v3-none", 2))
+
+ self.assertEqual(
+ "timestamp: 1970-01-01 01:15:36.000000000 +0000\ncommitter: fry\n",
+ self.client_get_prop("dc", "bzr:revision-info", 2))
+
+ def test_commit_parents(self):
+ self.make_checkout(self.repos_url, "dc")
+ self.build_tree({'dc/foo/bla': "data"})
+ self.client_add("dc/foo")
+ wt = WorkingTree.open("dc")
+ wt.set_pending_merges(["some-ghost-revision"])
+ self.assertEqual(["some-ghost-revision"], wt.get_parent_ids()[1:])
+ wt.commit(message="data")
+ self.assertEqual("some-ghost-revision\n",
+ self.client_get_prop(self.repos_url, "bzr:ancestry:v3-none", 1))
+ self.assertEqual((wt.branch.generate_revision_id(0), "some-ghost-revision"),
+ wt.branch.repository.get_revision(
+ wt.branch.last_revision()).parent_ids)
++
++ def test_push_unnecessary_merge(self):
++ from bzrlib.debug import debug_flags
++ debug_flags.add("commit")
++ debug_flags.add("fetch")
++ repos_url = self.make_repository("a")
++ bzrwt = BzrDir.create_standalone_workingtree("c",
++ format=format.get_rich_root_format())
++ self.build_tree({'c/registry/generic.c': "Tour"})
++ bzrwt.add("registry")
++ bzrwt.add("registry/generic.c")
++ revid1 = bzrwt.commit("Add initial directory + file",
++ rev_id="initialrevid")
++
++ # Push first branch into Subversion
++ newdir = BzrDir.open(repos_url+"/trunk")
++ newbranch = newdir.import_branch(bzrwt.branch)
++
++ c = ra.RemoteAccess(repos_url)
++ self.assertTrue(c.check_path("trunk/registry/generic.c", c.get_latest_revnum()) == core.NODE_FILE)
++
++ dc = self.get_commit_editor(repos_url)
++ trunk = dc.open_dir("trunk")
++ registry = trunk.open_dir("trunk/registry")
++ registry.open_file("trunk/registry/generic.c").modify("BLA")
++ dc.close()
++ mapping = newdir.find_repository().get_mapping()
++ merge_revid = newdir.find_repository().generate_revision_id(2, "trunk", mapping)
++
++ # Merge
++ self.build_tree({'c/registry/generic.c': "DE"})
++ bzrwt.add_pending_merge(merge_revid)
++ self.assertEquals(bzrwt.get_parent_ids()[1], merge_revid)
++ revid2 = bzrwt.commit("Merge something", rev_id="mergerevid")
++ bzr_parents = bzrwt.branch.repository.get_revision(revid2).parent_ids
++ trunk = Branch.open(repos_url + "/trunk")
++ trunk.pull(bzrwt.branch)
++
++ self.assertEquals(tuple(bzr_parents),
++ trunk.repository.get_revision(revid2).parent_ids)
++
++ self.assertEquals([revid1, revid2], trunk.revision_history())
++ self.assertEquals(
++ '1 initialrevid\n2 mergerevid\n',
++ self.client_get_prop(repos_url+"/trunk", SVN_PROP_BZR_REVISION_ID+"v3-trunk0",
++ c.get_latest_revnum()))
--- /dev/null
- self.assertEqual("svn-v3-none:%s::1" % (branch.repository.uuid), branch.generate_revision_id(1))
+# Copyright (C) 2006-2007 Jelmer Vernooij <jelmer@samba.org>
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""Branch tests."""
+
+from bzrlib import urlutils
+from bzrlib.branch import Branch
+from bzrlib.bzrdir import BzrDir
+from bzrlib.errors import NoSuchFile, NoSuchRevision, NotBranchError, NoSuchTag
+from bzrlib.repository import Repository
+from bzrlib.revision import NULL_REVISION
+from bzrlib.trace import mutter
+
+import os
+from unittest import TestCase
+
+from bzrlib.plugins.svn import core
+from bzrlib.plugins.svn.branch import FakeControlFiles, SvnBranchFormat
+from bzrlib.plugins.svn.convert import load_dumpfile
+from bzrlib.plugins.svn.mapping import SVN_PROP_BZR_REVISION_ID
+from bzrlib.plugins.svn.tests import SubversionTestCase
+
+class WorkingSubversionBranch(SubversionTestCase):
+ def test_last_rev_rev_hist(self):
+ repos_url = self.make_repository("a")
+ branch = Branch.open(repos_url)
+ branch.revision_history()
+ self.assertEqual(branch.generate_revision_id(0), branch.last_revision())
+
+ def test_get_branch_path_root(self):
+ repos_url = self.make_repository("a")
+ branch = Branch.open(repos_url)
+ self.assertEqual("", branch.get_branch_path())
+
+ def test_tags_dict(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ tags = dc.add_dir("tags")
+ tags.add_dir("tags/foo")
+ dc.add_dir("trunk")
+ dc.close()
+
+ b = Branch.open(repos_url + "/trunk")
+ self.assertEquals(["foo"], b.tags.get_tag_dict().keys())
+
+ def test_tag_set(self):
+ repos_url = self.make_repository('a')
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_dir("trunk")
+ dc.add_dir("tags")
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.open_dir("trunk")
+ trunk.add_file("trunk/bla").modify()
+ dc.close()
+
+ b = Branch.open(repos_url + "/trunk")
+ b.tags.set_tag(u"mytag", b.repository.generate_revision_id(1, "trunk", b.repository.get_mapping()))
+
+ self.assertEquals(core.NODE_DIR,
+ b.repository.transport.check_path("tags/mytag", 3))
+
+ def test_tags_delete(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ tags = dc.add_dir("tags")
+ tags.add_dir("tags/foo")
+ dc.add_dir("trunk")
+ dc.close()
+
+ b = Branch.open(repos_url + "/trunk")
+ self.assertEquals(["foo"], b.tags.get_tag_dict().keys())
+ b.tags.delete_tag(u"foo")
+ b = Branch.open(repos_url + "/trunk")
+ self.assertEquals([], b.tags.get_tag_dict().keys())
+
+ def test_tag_lookup(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ tags = dc.add_dir("tags")
+ tags.add_dir("tags/foo")
+ dc.add_dir("trunk")
+ dc.close()
+
+ b = Branch.open(repos_url + "/trunk")
+ self.assertEquals("", b.project)
+ self.assertEquals(b.repository.generate_revision_id(1, "tags/foo", b.repository.get_mapping()), b.tags.lookup_tag("foo"))
+
+ def test_tag_lookup_nonexistant(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_dir("trunk")
+ dc.close()
+
+ b = Branch.open(repos_url + "/trunk")
+ self.assertRaises(NoSuchTag, b.tags.lookup_tag, "foo")
+
+ def test_tags_delete_nonexistent(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_dir("trunk")
+ dc.close()
+
+ b = Branch.open(repos_url + "/trunk")
+ self.assertRaises(NoSuchTag, b.tags.delete_tag, u"foo")
+
+ def test_get_branch_path_old(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_dir("trunk")
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_dir("trunk2", "trunk", 1)
+ dc.close()
+
+ branch = Branch.open(urlutils.join(repos_url, "trunk2"))
+ self.assertEqual("trunk2", branch.get_branch_path(2))
+ self.assertEqual("trunk", branch.get_branch_path(1))
+
+ def test_pull_internal(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_dir("trunk")
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ branches = dc.add_dir("branches")
+ branches.add_dir("branches/foo", "trunk", 1)
+ dc.close()
+
+ otherbranch = Branch.open(urlutils.join(repos_url, "branches", "foo"))
+ branch = Branch.open(urlutils.join(repos_url, "trunk"))
+ result = branch.pull(otherbranch)
+ self.assertEquals(branch.last_revision(), otherbranch.last_revision())
+ self.assertEquals(result.new_revid, otherbranch.last_revision())
+ self.assertEquals(result.old_revid, branch.revision_history()[0])
+ self.assertEquals(result.old_revno, 1)
+ self.assertEquals(result.new_revno, 2)
+ self.assertEquals(result.master_branch, None)
+ self.assertEquals(result.source_branch, otherbranch)
+ self.assertEquals(result.target_branch, branch)
+
+ def test_get_branch_path_subdir(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_dir("trunk")
+ dc.close()
+
+ branch = Branch.open(repos_url+"/trunk")
+ self.assertEqual("trunk", branch.get_branch_path())
+
+ def test_open_nonexistant(self):
+ repos_url = self.make_repository("a")
+ self.assertRaises(NotBranchError, Branch.open, repos_url + "/trunk")
+
+ def test_last_rev_rev_info(self):
+ repos_url = self.make_repository("a")
+ branch = Branch.open(repos_url)
+ self.assertEqual((1, branch.generate_revision_id(0)),
+ branch.last_revision_info())
+ branch.revision_history()
+ self.assertEqual((1, branch.generate_revision_id(0)),
+ branch.last_revision_info())
+
+ def test_lookup_revision_id_unknown(self):
+ repos_url = self.make_repository("a")
+ branch = Branch.open(repos_url)
+ self.assertRaises(NoSuchRevision,
+ lambda: branch.lookup_revision_id("bla"))
+
+ def test_lookup_revision_id(self):
+ repos_url = self.make_repository("a")
+ branch = Branch.open(repos_url)
+ self.assertEquals(0,
+ branch.lookup_revision_id(branch.last_revision()))
+
+ def test_set_parent(self):
+ repos_url = self.make_repository('a')
+ branch = Branch.open(repos_url)
+ branch.set_parent("foobar")
+
+ def test_num_revnums(self):
+ repos_url = self.make_repository('a')
+ bzrdir = BzrDir.open(repos_url)
+ branch = bzrdir.open_branch()
+ self.assertEqual(branch.generate_revision_id(0),
+ branch.last_revision())
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("foo").modify()
+ dc.close()
+
+ bzrdir = BzrDir.open(repos_url)
+ branch = bzrdir.open_branch()
+ repos = bzrdir.find_repository()
+
+ mapping = repos.get_mapping()
+
+ self.assertEqual(repos.generate_revision_id(1, "", mapping),
+ branch.last_revision())
+
+ dc = self.get_commit_editor(repos_url)
+ dc.open_file("foo").modify()
+ dc.close()
+
+ branch = Branch.open(repos_url)
+ repos = Repository.open(repos_url)
+
+ self.assertEqual(repos.generate_revision_id(2, "", mapping),
+ branch.last_revision())
+
+ def test_set_revision_history_empty(self):
+ repos_url = self.make_repository('a')
+ branch = Branch.open(repos_url)
+ self.assertRaises(NotImplementedError, branch.set_revision_history, [])
+
+ def test_set_revision_history_ghost(self):
+ repos_url = self.make_repository('a')
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir("trunk")
+ trunk.add_file('trunk/foo').modify()
+ dc.close()
+
+ branch = Branch.open(repos_url+"/trunk")
+ self.assertRaises(NotImplementedError, branch.set_revision_history, ["nonexistantt"])
+
+ def test_set_revision_history(self):
+ repos_url = self.make_repository('a')
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir("trunk")
+ trunk.add_file('trunk/foo').modify()
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.open_dir("trunk")
+ trunk.add_file('trunk/bla').modify()
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.open_dir("trunk")
+ trunk.add_file('trunk/bar').modify()
+ dc.close()
+
+ branch = Branch.open(repos_url+"/trunk")
+ orig_history = branch.revision_history()
+ branch.set_revision_history(orig_history[:-1])
+ self.assertEquals(orig_history[:-1], branch.revision_history())
+
+ def test_break_lock(self):
+ repos_url = self.make_repository('a')
+ branch = Branch.open(repos_url)
+ branch.control_files.break_lock()
+
+ def test_repr(self):
+ repos_url = self.make_repository('a')
+ branch = Branch.open(repos_url)
+ self.assertEqual("SvnBranch('%s')" % repos_url, branch.__repr__())
+
+ def test_get_physical_lock_status(self):
+ repos_url = self.make_repository('a')
+ branch = Branch.open(repos_url)
+ self.assertFalse(branch.get_physical_lock_status())
+
+ def test_set_push_location(self):
+ repos_url = self.make_repository('a')
+ branch = Branch.open(repos_url)
+ self.assertRaises(NotImplementedError, branch.set_push_location, [])
+
+ def test_get_parent(self):
+ repos_url = self.make_repository('a')
+ branch = Branch.open(repos_url)
+ self.assertEqual(None, branch.get_parent())
+
+ def test_get_push_location(self):
+ repos_url = self.make_repository('a')
+ branch = Branch.open(repos_url)
+ self.assertIs(None, branch.get_push_location())
+
+ def test_revision_history(self):
+ repos_url = self.make_repository('a')
+
+ branch = Branch.open(repos_url)
+ self.assertEqual([branch.generate_revision_id(0)],
+ branch.revision_history())
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("foo").modify()
+ dc.change_prop(SVN_PROP_BZR_REVISION_ID+"v3-none",
+ "42 mycommit\n")
+ dc.close()
+
+ branch = Branch.open(repos_url)
+ repos = Repository.open(repos_url)
+
+ mapping = repos.get_mapping()
+
+ self.assertEqual([repos.generate_revision_id(0, "", mapping),
+ repos.generate_revision_id(1, "", mapping)],
+ branch.revision_history())
+
+ dc = self.get_commit_editor(repos_url)
+ dc.open_file("foo").modify()
+ dc.close()
+
+ branch = Branch.open(repos_url)
+ repos = Repository.open(repos_url)
+
+ mapping = repos.get_mapping()
+
+ self.assertEqual([
+ repos.generate_revision_id(0, "", mapping),
+ "mycommit",
+ repos.generate_revision_id(2, "", mapping)],
+ branch.revision_history())
+
+ def test_revision_id_to_revno_none(self):
+ """The None revid should map to revno 0."""
+ repos_url = self.make_repository('a')
+ branch = Branch.open(repos_url)
+ self.assertEquals(0, branch.revision_id_to_revno(NULL_REVISION))
+
+ def test_revision_id_to_revno_nonexistant(self):
+ """revision_id_to_revno() should raise NoSuchRevision if
+ the specified revision did not exist in the branch history."""
+ repos_url = self.make_repository('a')
+ branch = Branch.open(repos_url)
+ self.assertRaises(NoSuchRevision, branch.revision_id_to_revno, "bla")
+
+ def test_get_nick_none(self):
+ repos_url = self.make_repository('a')
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("foo").modify()
+ dc.close()
+
+ branch = Branch.open(repos_url)
+
+ self.assertEquals("a", branch.nick)
+
+ def test_get_nick_path(self):
+ repos_url = self.make_repository('a')
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_dir("trunk")
+ dc.close()
+
+ branch = Branch.open(repos_url+"/trunk")
+
+ self.assertEqual("trunk", branch.nick)
+
+ def test_get_revprops(self):
+ repos_url = self.make_repository('a')
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("foo").modify()
+ dc.change_prop("bzr:revision-info",
+ "properties: \n\tbranch-nick: mybranch\n")
+ dc.close()
+
+ branch = Branch.open(repos_url)
+
+ rev = branch.repository.get_revision(branch.last_revision())
+
+ self.assertEqual("mybranch", rev.properties["branch-nick"])
+
+ def test_fetch_replace(self):
+ filename = os.path.join(self.test_dir, "dumpfile")
+ open(filename, 'w').write("""SVN-fs-dump-format-version: 2
+
+UUID: 6f95bc5c-e18d-4021-aca8-49ed51dbcb75
+
+Revision-number: 0
+Prop-content-length: 56
+Content-length: 56
+
+K 8
+svn:date
+V 27
+2006-07-30T12:41:25.270824Z
+PROPS-END
+
+Revision-number: 1
+Prop-content-length: 94
+Content-length: 94
+
+K 7
+svn:log
+V 0
+
+K 10
+svn:author
+V 0
+
+K 8
+svn:date
+V 27
+2006-07-30T12:41:26.117512Z
+PROPS-END
+
+Node-path: trunk
+Node-kind: dir
+Node-action: add
+Prop-content-length: 10
+Content-length: 10
+
+PROPS-END
+
+
+Node-path: trunk/hosts
+Node-kind: file
+Node-action: add
+Prop-content-length: 10
+Text-content-length: 4
+Text-content-md5: 771ec3328c29d17af5aacf7f895dd885
+Content-length: 14
+
+PROPS-END
+hej1
+
+Revision-number: 2
+Prop-content-length: 94
+Content-length: 94
+
+K 7
+svn:log
+V 0
+
+K 10
+svn:author
+V 0
+
+K 8
+svn:date
+V 27
+2006-07-30T12:41:27.130044Z
+PROPS-END
+
+Node-path: trunk/hosts
+Node-kind: file
+Node-action: change
+Text-content-length: 4
+Text-content-md5: 6c2479dbb342b8df96d84db7ab92c412
+Content-length: 4
+
+hej2
+
+Revision-number: 3
+Prop-content-length: 94
+Content-length: 94
+
+K 7
+svn:log
+V 0
+
+K 10
+svn:author
+V 0
+
+K 8
+svn:date
+V 27
+2006-07-30T12:41:28.114350Z
+PROPS-END
+
+Node-path: trunk/hosts
+Node-kind: file
+Node-action: change
+Text-content-length: 4
+Text-content-md5: 368cb8d3db6186e2e83d9434f165c525
+Content-length: 4
+
+hej3
+
+Revision-number: 4
+Prop-content-length: 94
+Content-length: 94
+
+K 7
+svn:log
+V 0
+
+K 10
+svn:author
+V 0
+
+K 8
+svn:date
+V 27
+2006-07-30T12:41:29.129563Z
+PROPS-END
+
+Node-path: branches
+Node-kind: dir
+Node-action: add
+Prop-content-length: 10
+Content-length: 10
+
+PROPS-END
+
+
+Revision-number: 5
+Prop-content-length: 94
+Content-length: 94
+
+K 7
+svn:log
+V 0
+
+K 10
+svn:author
+V 0
+
+K 8
+svn:date
+V 27
+2006-07-30T12:41:31.130508Z
+PROPS-END
+
+Node-path: branches/foobranch
+Node-kind: dir
+Node-action: add
+Node-copyfrom-rev: 4
+Node-copyfrom-path: trunk
+
+
+Revision-number: 6
+Prop-content-length: 94
+Content-length: 94
+
+K 7
+svn:log
+V 0
+
+K 10
+svn:author
+V 0
+
+K 8
+svn:date
+V 27
+2006-07-30T12:41:33.129149Z
+PROPS-END
+
+Node-path: branches/foobranch/hosts
+Node-kind: file
+Node-action: delete
+
+Node-path: branches/foobranch/hosts
+Node-kind: file
+Node-action: add
+Node-copyfrom-rev: 2
+Node-copyfrom-path: trunk/hosts
+
+
+
+
+Revision-number: 7
+Prop-content-length: 94
+Content-length: 94
+
+K 7
+svn:log
+V 0
+
+K 10
+svn:author
+V 0
+
+K 8
+svn:date
+V 27
+2006-07-30T12:41:34.136423Z
+PROPS-END
+
+Node-path: branches/foobranch/hosts
+Node-kind: file
+Node-action: change
+Text-content-length: 8
+Text-content-md5: 0e328d3517a333a4879ebf3d88fd82bb
+Content-length: 8
+
+foohosts""")
+ os.mkdir("new")
+ os.mkdir("old")
+
+ load_dumpfile("dumpfile", "old")
+
+ url = "old/branches/foobranch"
+ mutter('open %r' % url)
+ olddir = BzrDir.open(url)
+
+ newdir = olddir.sprout("new")
+
+ newbranch = newdir.open_branch()
+
+ oldbranch = Branch.open(url)
+
+ uuid = "6f95bc5c-e18d-4021-aca8-49ed51dbcb75"
+ newbranch.lock_read()
+ tree = newbranch.repository.revision_tree(oldbranch.generate_revision_id(7))
+
+ host_fileid = tree.inventory.path2id("hosts")
+
+ self.assertVersionsPresentEquals(newbranch.repository.texts,
+ host_fileid, [
+ oldbranch.generate_revision_id(6),
+ oldbranch.generate_revision_id(7)])
+ newbranch.unlock()
+
+
+ def test_fetch_odd(self):
+ repos_url = self.make_repository('d')
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir("trunk")
+ trunk.add_file("trunk/hosts").modify()
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.open_dir("trunk")
+ trunk.open_file("trunk/hosts").modify()
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ dc.open_file("trunk/hosts").modify()
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_dir("branches")
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ branches = dc.open_dir("branches")
+ branches.add_dir("branches/foobranch", "trunk")
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ branches = dc.open_dir("branches")
+ foobranch = branches.open_dir("branches/foobranch")
+ foobranch.open_file("branches/foobranch/hosts").modify()
+ dc.close()
+
+ os.mkdir("new")
+
+ url = repos_url+"/branches/foobranch"
+ mutter('open %r' % url)
+ olddir = BzrDir.open(url)
+
+ newdir = olddir.sprout("new")
+
+ newbranch = newdir.open_branch()
+ oldbranch = olddir.open_branch()
+
+ uuid = olddir.find_repository().uuid
+ tree = newbranch.repository.revision_tree(
+ oldbranch.generate_revision_id(6))
+ transaction = newbranch.repository.get_transaction()
+ newbranch.repository.lock_read()
+ texts = newbranch.repository.texts
+ host_fileid = tree.inventory.path2id("hosts")
+ mapping = oldbranch.repository.get_mapping()
+ self.assertVersionsPresentEquals(texts, host_fileid, [
+ mapping.revision_id_foreign_to_bzr((uuid, 1, "trunk")),
+ mapping.revision_id_foreign_to_bzr((uuid, 2, "trunk")),
+ mapping.revision_id_foreign_to_bzr((uuid, 3, "trunk")),
+ oldbranch.generate_revision_id(6)])
+ newbranch.repository.unlock()
+
+ def assertVersionsPresentEquals(self, texts, fileid, versions):
+ self.assertEqual(set([(fileid, v) for v in versions]),
+ set(filter(lambda (fid, rid): fid == fileid, texts.keys())))
+
+ def test_check(self):
+ self.make_repository('d')
+ branch = Branch.open('d')
+ result = branch.check()
+ self.assertEqual(branch, result.branch)
+
+ def test_generate_revision_id(self):
+ repos_url = self.make_repository('d')
+
+ dc = self.get_commit_editor(repos_url)
+ bla = dc.add_dir("bla")
+ bla.add_dir("bla/bloe")
+ dc.close()
+
+ branch = Branch.open('d')
++ mapping = branch.repository.get_mapping()
++ self.assertEqual(mapping.revision_id_foreign_to_bzr((branch.repository.uuid, 1, "")), branch.generate_revision_id(1))
+
+ def test_create_checkout(self):
+ repos_url = self.make_repository('d')
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir("trunk")
+ trunk.add_file("trunk/hosts").modify()
+ dc.close()
+
+ url = repos_url+"/trunk"
+ oldbranch = Branch.open(url)
+
+ newtree = oldbranch.create_checkout("e")
+ self.assertTrue(newtree.branch.repository.has_revision(
+ oldbranch.generate_revision_id(1)))
+
+ self.assertTrue(os.path.exists("e/.bzr"))
+ self.assertFalse(os.path.exists("e/.svn"))
+
+ def test_create_checkout_lightweight(self):
+ repos_url = self.make_repository('d')
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir("trunk")
+ trunk.add_file("trunk/hosts")
+ dc.close()
+
+ oldbranch = Branch.open(repos_url+"/trunk")
+ newtree = oldbranch.create_checkout("e", lightweight=True)
+ self.assertEqual(oldbranch.generate_revision_id(1), newtree.base_revid)
+ self.assertTrue(os.path.exists("e/.svn"))
+ self.assertFalse(os.path.exists("e/.bzr"))
+
+ def test_create_checkout_lightweight_stop_rev(self):
+ repos_url = self.make_repository('d')
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir("trunk")
+ trunk.add_file("trunk/hosts").modify()
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.open_dir("trunk")
+ trunk.open_file("trunk/hosts").modify()
+ dc.close()
+
+ url = repos_url+"/trunk"
+ oldbranch = Branch.open(url)
+
+ newtree = oldbranch.create_checkout("e", revision_id=
+ oldbranch.generate_revision_id(1), lightweight=True)
+ self.assertEqual(oldbranch.generate_revision_id(1),
+ newtree.base_revid)
+ self.assertTrue(os.path.exists("e/.svn"))
+ self.assertFalse(os.path.exists("e/.bzr"))
+
+ def test_fetch_branch(self):
+ repos_url = self.make_client('d', 'sc')
+
+ sc = self.get_commit_editor(repos_url)
+ foo = sc.add_dir("foo")
+ foo.add_file("foo/bla").modify()
+ sc.close()
+
+ olddir = BzrDir.open("sc")
+
+ os.mkdir("dc")
+
+ newdir = olddir.sprout('dc')
+
+ self.assertEqual(
+ olddir.open_branch().last_revision(),
+ newdir.open_branch().last_revision())
+
+ def test_fetch_dir_upgrade(self):
+ repos_url = self.make_client('d', 'sc')
+
+ sc = self.get_commit_editor(repos_url)
+ trunk = sc.add_dir("trunk")
+ mylib = trunk.add_dir("trunk/mylib")
+ mylib.add_file("trunk/mylib/bla").modify()
+ sc.add_dir("branches")
+ sc.close()
+
+ sc = self.get_commit_editor(repos_url)
+ branches = sc.open_dir("branches")
+ branches.add_dir("branches/abranch", "trunk/mylib")
+ sc.close()
+
+ self.client_update('sc')
+ olddir = BzrDir.open("sc/branches/abranch")
+
+ os.mkdir("dc")
+
+ newdir = olddir.sprout('dc')
+
+ self.assertEqual(
+ olddir.open_branch().last_revision(),
+ newdir.open_branch().last_revision())
+
+ def test_fetch_branch_downgrade(self):
+ repos_url = self.make_client('d', 'sc')
+
+ sc = self.get_commit_editor(repos_url)
+ sc.add_dir("trunk")
+ branches = sc.add_dir("branches")
+ abranch = branches.add_dir("branches/abranch")
+ abranch.add_file("branches/abranch/bla").modify()
+ sc.close()
+
+ sc = self.get_commit_editor(repos_url)
+ trunk = sc.open_dir("trunk")
+ sc.add_dir("trunk/mylib", "branches/abranch")
+ sc.close()
+
+ self.client_update('sc')
+ olddir = BzrDir.open("sc/trunk")
+
+ os.mkdir("dc")
+
+ newdir = olddir.sprout('dc')
+
+ self.assertEqual(
+ olddir.open_branch().last_revision(),
+ newdir.open_branch().last_revision())
+
+
+
+ def test_ghost_workingtree(self):
+ # Looks like bazaar has trouble creating a working tree of a
+ # revision that has ghost parents
+ repos_url = self.make_client('d', 'sc')
+
+ sc = self.get_commit_editor(repos_url)
+ foo = sc.add_dir("foo")
+ foo.add_file("foo/bla").modify()
+ sc.change_prop("bzr:ancestry:v3-none", "some-ghost\n")
+ sc.close()
+
+ olddir = BzrDir.open("sc")
+
+ os.mkdir("dc")
+
+ newdir = olddir.sprout('dc')
+ newdir.find_repository().get_revision(
+ newdir.open_branch().last_revision())
+ newdir.find_repository().get_revision_inventory(
+ newdir.open_branch().last_revision())
+
+
+class TestFakeControlFiles(TestCase):
+ def test_get_utf8(self):
+ f = FakeControlFiles()
+ self.assertRaises(NoSuchFile, f.get_utf8, "foo")
+
+
+ def test_get(self):
+ f = FakeControlFiles()
+ self.assertRaises(NoSuchFile, f.get, "foobla")
+
+
+class BranchFormatTests(TestCase):
+ def setUp(self):
+ self.format = SvnBranchFormat()
+
+ def test_initialize(self):
+ self.assertRaises(NotImplementedError, self.format.initialize, None)
+
+ def test_get_format_string(self):
+ self.assertEqual("Subversion Smart Server",
+ self.format.get_format_string())
+
+ def test_get_format_description(self):
+ self.assertEqual("Subversion Smart Server",
+ self.format.get_format_description())
--- /dev/null
- self.assertEqual(['svn-v3-none:6987ef2d-cd6b-461f-9991-6f1abef3bd59::0'], branch.all_revision_ids())
+# Copyright (C) 2006-2007 Jelmer Vernooij <jelmer@samba.org>
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""Full repository conversion tests."""
+
+from bzrlib.branch import Branch
+from bzrlib.bzrdir import BzrDir, format_registry
+from bzrlib.errors import NotBranchError, NoSuchFile, IncompatibleRepositories
+from bzrlib.urlutils import local_path_to_url
+from bzrlib.repository import Repository
+from bzrlib.tests import TestCaseInTempDir
+
+import os, sys
+
+from bzrlib.plugins.svn import repos
+from bzrlib.plugins.svn.layout import RootLayout, TrunkLayout
+from bzrlib.plugins.svn.convert import convert_repository, NotDumpFile, load_dumpfile
+from bzrlib.plugins.svn.format import get_rich_root_format
+from bzrlib.plugins.svn.tests import SubversionTestCase
+
+class TestLoadDumpfile(TestCaseInTempDir):
+ def test_loaddumpfile(self):
+ dumpfile = os.path.join(self.test_dir, "dumpfile")
+ open(dumpfile, 'w').write(
+"""SVN-fs-dump-format-version: 2
+
+UUID: 6987ef2d-cd6b-461f-9991-6f1abef3bd59
+
+Revision-number: 0
+Prop-content-length: 56
+Content-length: 56
+
+K 8
+svn:date
+V 27
+2006-07-02T13:14:51.972532Z
+PROPS-END
+""")
+ load_dumpfile(dumpfile, "d")
+ fs = repos.Repository("d").fs()
+ self.assertEqual("6987ef2d-cd6b-461f-9991-6f1abef3bd59",
+ fs.get_uuid())
+
+ def test_loaddumpfile_invalid(self):
+ dumpfile = os.path.join(self.test_dir, "dumpfile")
+ open(dumpfile, 'w').write("""FooBar\n""")
+ self.assertRaises(NotDumpFile, load_dumpfile, dumpfile, "d")
+
+
+class TestConversion(SubversionTestCase):
+ def setUp(self):
+ super(TestConversion, self).setUp()
+ self.repos_url = self.make_repository('d')
+
+ dc = self.get_commit_editor()
+ t = dc.add_dir("trunk")
+ t.add_file("trunk/file").modify("data")
+ bs = dc.add_dir("branches")
+ ab = bs.add_dir("branches/abranch")
+ ab.add_file("branches/abranch/anotherfile").modify("data2")
+ dc.close()
+
+ dc = self.get_commit_editor()
+ t = dc.open_dir("trunk")
+ t.open_file("trunk/file").modify("otherdata")
+ dc.close()
+
+ def get_commit_editor(self):
+ return super(TestConversion, self).get_commit_editor(self.repos_url)
+
+ def test_sets_parent_urls(self):
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0),
+ all=False, create_shared_repo=True)
+ self.assertEquals(self.repos_url+"/trunk",
+ Branch.open("e/trunk").get_parent())
+ self.assertEquals(self.repos_url+"/branches/abranch",
+ Branch.open("e/branches/abranch").get_parent())
+
+ def test_fetch_alive(self):
+ dc = self.get_commit_editor()
+ bs = dc.open_dir("branches")
+ sb = bs.add_dir("branches/somebranch")
+ sb.add_file("branches/somebranch/somefile").modify('data')
+ dc.close()
+
+ dc = self.get_commit_editor()
+ bs = dc.open_dir("branches")
+ bs.delete("branches/somebranch")
+ dc.close()
+
+ oldrepos = Repository.open(self.repos_url)
+ convert_repository(oldrepos, "e",
+ TrunkLayout(0),
+ all=False, create_shared_repo=True)
+ newrepos = Repository.open("e")
+ oldrepos.set_layout(TrunkLayout(0))
+ self.assertFalse(newrepos.has_revision(oldrepos.generate_revision_id(2, "branches/somebranch", oldrepos.get_mapping())))
+
+ def test_fetch_filebranch(self):
+ dc = self.get_commit_editor()
+ bs = dc.open_dir("branches")
+ bs.add_file("branches/somebranch").modify('data')
+ dc.close()
+
+ oldrepos = Repository.open(self.repos_url)
+ convert_repository(oldrepos, "e", TrunkLayout(0))
+ newrepos = Repository.open("e")
+ oldrepos.set_layout(TrunkLayout(0))
+ self.assertFalse(newrepos.has_revision(oldrepos.generate_revision_id(2, "branches/somebranch", oldrepos.get_mapping())))
+
+ def test_fetch_dead(self):
+ dc = self.get_commit_editor()
+ bs = dc.open_dir("branches")
+ sb = bs.add_dir("branches/somebranch")
+ sb.add_file("branches/somebranch/somefile").modify('data')
+ dc.close()
+
+ dc = self.get_commit_editor()
+ bs = dc.open_dir("branches")
+ bs.delete("branches/somebranch")
+ dc.close()
+
+ oldrepos = Repository.open(self.repos_url)
+ convert_repository(oldrepos, "e", TrunkLayout(0),
+ all=True, create_shared_repo=True)
+ newrepos = Repository.open("e")
+ self.assertTrue(newrepos.has_revision(
+ oldrepos.generate_revision_id(3, "branches/somebranch", oldrepos.get_mapping())))
+
+ def test_fetch_filter(self):
+ dc = self.get_commit_editor()
+ branches = dc.open_dir("branches")
+ dc.add_dir("branches/somebranch")
+ dc.add_file("branches/somebranch/somefile").modify('data')
+ dc.close()
+
+ dc = self.get_commit_editor()
+ branches = dc.open_dir("branches")
+ ab = branches.add_dir("branches/anotherbranch")
+ ab.add_file("branches/anotherbranch/somefile").modify('data')
+ dc.close()
+
+ oldrepos = Repository.open(self.repos_url)
+ convert_repository(oldrepos, "e", TrunkLayout(0),
+ create_shared_repo=True,
+ filter_branch=lambda branch: branch.get_branch_path().endswith("somebranch"))
+ newrepos = Repository.open("e")
+ self.assertTrue(os.path.exists("e/branches/somebranch"))
+ self.assertFalse(os.path.exists("e/branches/anotherbranch"))
+
+ def test_shared_import_continue(self):
+ dir = BzrDir.create("e", format=get_rich_root_format())
+ dir.create_repository(shared=True)
+
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=True)
+
+ self.assertTrue(Repository.open("e").is_shared())
+
+ def test_shared_import_continue_remove(self):
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=True)
+
+ dc = self.get_commit_editor()
+ dc.delete("trunk")
+ dc.close()
+
+ dc = self.get_commit_editor()
+ trunk = dc.add_dir("trunk")
+ trunk.add_file("trunk/file").modify()
+ dc.close()
+
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=True)
+
+ def test_shared_import_remove_nokeep(self):
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=True)
+
+ dc = self.get_commit_editor()
+ dc.delete("trunk")
+ dc.close()
+
+ self.assertTrue(os.path.exists("e/trunk"))
+
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=True)
+
+ self.assertFalse(os.path.exists("e/trunk"))
+
+ def test_shared_import_continue_with_wt(self):
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), working_trees=True)
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), working_trees=True)
+
+ def test_shared_import_rootlayout_empty(self):
+ dir = BzrDir.create("e", format=get_rich_root_format())
+ dir.create_repository(shared=True)
+
+ convert_repository(Repository.open(self.repos_url), "e",
+ RootLayout(), create_shared_repo=True)
+
+ def test_shared_import_with_wt(self):
+ dir = BzrDir.create("e", format=get_rich_root_format())
+ dir.create_repository(shared=True)
+
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=True,
+ working_trees=True)
+
+ self.assertTrue(os.path.isfile(os.path.join(
+ self.test_dir, "e", "trunk", "file")))
+
+ def test_shared_import_without_wt(self):
+ dir = BzrDir.create("e", format=get_rich_root_format())
+ dir.create_repository(shared=True)
+
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=True,
+ working_trees=False)
+
+ self.assertFalse(os.path.isfile(os.path.join(
+ self.test_dir, "e", "trunk", "file")))
+
+ def test_shared_import_old_repos_fails(self):
+ dir = BzrDir.create("e", format=format_registry.make_bzrdir('knit'))
+ dir.create_repository(shared=True)
+
+ self.assertRaises(IncompatibleRepositories,
+ lambda: convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=True,
+ working_trees=False))
+
+ def test_shared_import_continue_branch(self):
+ oldrepos = Repository.open(self.repos_url)
+ convert_repository(oldrepos, "e",
+ TrunkLayout(0), create_shared_repo=True)
+
+ mapping = oldrepos.get_mapping()
+
+ dc = self.get_commit_editor()
+ trunk = dc.open_dir("trunk")
+ trunk.open_file("trunk/file").modify()
+ dc.close()
+
+ self.assertEqual(
+ Repository.open(self.repos_url).generate_revision_id(2, "trunk", mapping),
+ Branch.open("e/trunk").last_revision())
+
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=True)
+
+ self.assertEqual(Repository.open(self.repos_url).generate_revision_id(3, "trunk", mapping),
+ Branch.open("e/trunk").last_revision())
+
+
+ def test_shared_import(self):
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=True)
+
+ self.assertTrue(Repository.open("e").is_shared())
+
+ def test_simple(self):
+ convert_repository(Repository.open(self.repos_url), os.path.join(self.test_dir, "e"), TrunkLayout(0))
+ self.assertTrue(os.path.isdir(os.path.join(self.test_dir, "e", "trunk")))
+ self.assertTrue(os.path.isdir(os.path.join(self.test_dir, "e", "branches", "abranch")))
+
+ def test_convert_to_nonexistant(self):
+ self.assertRaises(NoSuchFile, convert_repository, Repository.open(self.repos_url), os.path.join(self.test_dir, "e", "foo", "bar"), TrunkLayout(0))
+
+ def test_notshared_import(self):
+ convert_repository(Repository.open(self.repos_url), "e",
+ TrunkLayout(0), create_shared_repo=False)
+
+ self.assertRaises(NotBranchError, Repository.open, "e")
+
+class TestConversionFromDumpfile(SubversionTestCase):
+ def test_dumpfile_open_empty(self):
+ dumpfile = os.path.join(self.test_dir, "dumpfile")
+ open(dumpfile, 'w').write(
+"""SVN-fs-dump-format-version: 2
+
+UUID: 6987ef2d-cd6b-461f-9991-6f1abef3bd59
+
+Revision-number: 0
+Prop-content-length: 56
+Content-length: 56
+
+K 8
+svn:date
+V 27
+2006-07-02T13:14:51.972532Z
+PROPS-END
+""")
+ branch_path = os.path.join(self.test_dir, "f")
+ repos = self.load_dumpfile(dumpfile, 'g')
+ convert_repository(repos, branch_path, RootLayout())
+ branch = Repository.open(branch_path)
++ mapping = branch.repository.get_mapping()
++ self.assertEqual([mapping.revision_id_foreign_to_bzr(("6987ef2d-cd6b-461f-9991-6f1abef3bd59", 0, ""))], branch.all_revision_ids())
+ Branch.open(branch_path)
+
+ def load_dumpfile(self, dumpfile, target_path):
+ load_dumpfile(dumpfile, target_path)
+ return Repository.open(target_path)
+
+ def test_dumpfile_open_empty_trunk(self):
+ dumpfile = os.path.join(self.test_dir, "dumpfile")
+ open(dumpfile, 'w').write(
+"""SVN-fs-dump-format-version: 2
+
+UUID: 6987ef2d-cd6b-461f-9991-6f1abef3bd59
+
+Revision-number: 0
+Prop-content-length: 56
+Content-length: 56
+
+K 8
+svn:date
+V 27
+2006-07-02T13:14:51.972532Z
+PROPS-END
+""")
+ branch_path = os.path.join(self.test_dir, "f")
+ repos = self.load_dumpfile(dumpfile, 'g')
+ convert_repository(repos, branch_path, TrunkLayout(0))
+ repository = Repository.open(branch_path)
+ self.assertEqual([], repository.all_revision_ids())
+ self.assertRaises(NotBranchError, Branch.open, branch_path)
+
+ def test_open_internal(self):
+ filename = os.path.join(self.test_dir, "dumpfile")
+ open(filename, 'w').write(
+"""SVN-fs-dump-format-version: 2
+
+UUID: 6987ef2d-cd6b-461f-9991-6f1abef3bd59
+
+Revision-number: 0
+Prop-content-length: 56
+Content-length: 56
+
+K 8
+svn:date
+V 27
+2006-07-02T13:14:51.972532Z
+PROPS-END
+
+Revision-number: 1
+Prop-content-length: 109
+Content-length: 109
+
+K 7
+svn:log
+V 9
+Add trunk
+K 10
+svn:author
+V 6
+jelmer
+K 8
+svn:date
+V 27
+2006-07-02T13:58:02.528258Z
+PROPS-END
+
+Node-path: trunk
+Node-kind: dir
+Node-action: add
+Prop-content-length: 10
+Content-length: 10
+
+PROPS-END
+
+
+Node-path: trunk/bla
+Node-kind: file
+Node-action: add
+Prop-content-length: 10
+Text-content-length: 5
+Text-content-md5: 6137cde4893c59f76f005a8123d8e8e6
+Content-length: 15
+
+PROPS-END
+data
+
+
+""")
+ repos = self.load_dumpfile(filename, 'g')
+ convert_repository(repos, os.path.join(self.test_dir, "e"),
+ TrunkLayout(0))
+ mapping = repos.get_mapping()
+ abspath = self.test_dir
+ if sys.platform == 'win32':
+ abspath = '/' + abspath
+ branch = Branch.open(os.path.join(self.test_dir, "e", "trunk"))
+ self.assertEqual(local_path_to_url(os.path.join(self.test_dir, "e", "trunk")), branch.base.rstrip("/"))
+ self.assertEqual(mapping.revision_id_foreign_to_bzr(("6987ef2d-cd6b-461f-9991-6f1abef3bd59", 1, 'trunk')), branch.last_revision())
+
--- /dev/null
- def test_push_unnecessary_merge(self):
- from bzrlib.debug import debug_flags
- debug_flags.add("commit")
- debug_flags.add("fetch")
- repos_url = self.make_repository("a")
- bzrwt = BzrDir.create_standalone_workingtree("c",
- format=format.get_rich_root_format())
- self.build_tree({'c/registry/generic.c': "Tour"})
- bzrwt.add("registry")
- bzrwt.add("registry/generic.c")
- revid1 = bzrwt.commit("Add initial directory + file",
- rev_id="initialrevid")
-
- # Push first branch into Subversion
- newdir = BzrDir.open(repos_url+"/trunk")
- newbranch = newdir.import_branch(bzrwt.branch)
-
- c = ra.RemoteAccess(repos_url)
- self.assertTrue(c.check_path("trunk/registry/generic.c", c.get_latest_revnum()) == core.NODE_FILE)
-
- dc = self.get_commit_editor(repos_url)
- trunk = dc.open_dir("trunk")
- registry = trunk.open_dir("trunk/registry")
- registry.open_file("trunk/registry/generic.c").modify("BLA")
- dc.close()
- mapping = newdir.find_repository().get_mapping()
- merge_revid = newdir.find_repository().generate_revision_id(2, "trunk", mapping)
-
- # Merge
- self.build_tree({'c/registry/generic.c': "DE"})
- bzrwt.add_pending_merge(merge_revid)
- self.assertEquals(bzrwt.get_parent_ids()[1], merge_revid)
- revid2 = bzrwt.commit("Merge something", rev_id="mergerevid")
- bzr_parents = bzrwt.branch.repository.get_revision(revid2).parent_ids
- trunk = Branch.open(repos_url + "/trunk")
- trunk.pull(bzrwt.branch)
-
- self.assertEquals(tuple(bzr_parents),
- trunk.repository.get_revision(revid2).parent_ids)
-
- self.assertEquals([revid1, revid2], trunk.revision_history())
- self.assertEquals(
- '1 initialrevid\n2 mergerevid\n',
- self.client_get_prop(repos_url+"/trunk", SVN_PROP_BZR_REVISION_ID+"v3-trunk0",
- c.get_latest_revnum()))
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2006-2007 Jelmer Vernooij <jelmer@samba.org>
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+from bzrlib.branch import Branch
+from bzrlib.bzrdir import BzrDir
+from bzrlib.errors import AlreadyBranchError, BzrError, DivergedBranches
+from bzrlib.merge import Merger, Merge3Merger
+from bzrlib.osutils import has_symlinks
+from bzrlib.progress import DummyProgress
+from bzrlib.repository import Repository
+from bzrlib.trace import mutter
+
+import os
+
+from bzrlib.plugins.svn import core, format, ra
+from bzrlib.plugins.svn.errors import MissingPrefix
+from bzrlib.plugins.svn.commit import push, dpush
+from bzrlib.plugins.svn.mapping import SVN_PROP_BZR_REVISION_ID
+from bzrlib.plugins.svn.tests import SubversionTestCase
+
+class TestDPush(SubversionTestCase):
+ def setUp(self):
+ super(TestDPush, self).setUp()
+ self.repos_url = self.make_repository('d')
+
+ dc = self.commit_editor()
+ foo = dc.add_dir("foo")
+ foo.add_file("foo/bla").modify("data")
+ dc.close()
+
+ self.svndir = BzrDir.open(self.repos_url)
+ os.mkdir("dc")
+ self.bzrdir = self.svndir.sprout("dc")
+
+ def commit_editor(self):
+ return self.get_commit_editor(self.repos_url)
+
+ def test_change_single(self):
+ self.build_tree({'dc/foo/bla': 'other data'})
+ wt = self.bzrdir.open_workingtree()
+ newid = wt.commit(message="Commit from Bzr")
+
+ revid_map = dpush(self.svndir.open_branch(), self.bzrdir.open_branch())
+
+ self.assertEquals([newid], revid_map.keys())
+
+ c = ra.RemoteAccess(self.repos_url)
+ (entries, fetch_rev, props) = c.get_dir("", c.get_latest_revnum())
+ self.assertEquals(set(['svn:entry:committed-rev',
+ 'svn:entry:last-author', 'svn:entry:uuid',
+ 'svn:entry:committed-date']), set(props.keys()))
+
+ r = self.svndir.find_repository()
+ self.assertEquals([r.generate_revision_id(
+ c.get_latest_revnum(),
+ "",
+ r.get_mapping())], revid_map.values())
+
+ def test_change_multiple(self):
+ self.build_tree({'dc/foo/bla': 'other data'})
+ wt = self.bzrdir.open_workingtree()
+ self.build_tree({'dc/foo/bla': 'other data'})
+ newid1 = wt.commit(message="Commit from Bzr")
+ self.build_tree({'dc/foo/bla': 'yet other data'})
+ newid2 = wt.commit(message="Commit from Bzr")
+
+ revid_map = dpush(self.svndir.open_branch(), self.bzrdir.open_branch())
+
+ self.assertEquals(set([newid1, newid2]), set(revid_map.keys()))
+
+ c = ra.RemoteAccess(self.repos_url)
+ (entries, fetch_rev, props) = c.get_dir("", c.get_latest_revnum())
+ self.assertEquals(set(['svn:entry:committed-rev',
+ 'svn:entry:last-author', 'svn:entry:uuid',
+ 'svn:entry:committed-date']), set(props.keys()))
+
+ r = self.svndir.find_repository()
+ self.assertEquals(set([r.generate_revision_id(
+ rev,
+ "",
+ r.get_mapping()) for rev in (c.get_latest_revnum()-1, c.get_latest_revnum())]),
+ set(revid_map.values()))
+
+ def test_diverged(self):
+ dc = self.commit_editor()
+ foo = dc.open_dir("foo")
+ foo.add_file("foo/bar").modify("data")
+ dc.close()
+
+ svndir = BzrDir.open(self.repos_url)
+
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr")
+
+ self.assertRaises(DivergedBranches,
+ dpush, svndir.open_branch(),
+ self.bzrdir.open_branch())
+
+
+class TestPush(SubversionTestCase):
+ def setUp(self):
+ super(TestPush, self).setUp()
+ self.repos_url = self.make_repository('d')
+
+ dc = self.commit_editor()
+ foo = dc.add_dir("foo")
+ foo.add_file("foo/bla").modify("data")
+ dc.close()
+
+ self.svndir = BzrDir.open(self.repos_url)
+ os.mkdir("dc")
+ self.bzrdir = self.svndir.sprout("dc")
+
+ def commit_editor(self):
+ return self.get_commit_editor(self.repos_url)
+
+ def test_empty(self):
+ svnbranch = self.svndir.open_branch()
+ bzrbranch = self.bzrdir.open_branch()
+ result = svnbranch.pull(bzrbranch)
+ self.assertEqual(0, result.new_revno - result.old_revno)
+ self.assertEqual(svnbranch.revision_history(),
+ bzrbranch.revision_history())
+
+ def test_child(self):
+ dc = self.commit_editor()
+ foo = dc.open_dir("foo")
+ foo.add_file("foo/bar").modify("data")
+ dc.close()
+
+ svnbranch = self.svndir.open_branch()
+ bzrbranch = self.bzrdir.open_branch()
+ result = svnbranch.pull(bzrbranch)
+ self.assertEqual(0, result.new_revno - result.old_revno)
+
+ def test_diverged(self):
+ dc = self.commit_editor()
+ foo = dc.open_dir("foo")
+ foo.add_file("foo/bar").modify("data")
+ dc.close()
+
+ svndir = BzrDir.open(self.repos_url)
+
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr")
+
+ self.assertRaises(DivergedBranches,
+ svndir.open_branch().pull,
+ self.bzrdir.open_branch())
+
+ def test_change(self):
+ self.build_tree({'dc/foo/bla': 'other data'})
+ wt = self.bzrdir.open_workingtree()
+ newid = wt.commit(message="Commit from Bzr")
+
+ svnbranch = self.svndir.open_branch()
+ svnbranch.pull(self.bzrdir.open_branch())
+
+ repos = self.svndir.find_repository()
+ mapping = repos.get_mapping()
+ self.assertEquals(newid, svnbranch.last_revision())
+ inv = repos.get_inventory(repos.generate_revision_id(2, "", mapping))
+ self.assertEqual(newid, inv[inv.path2id('foo/bla')].revision)
+ self.assertEqual(wt.branch.last_revision(),
+ repos.generate_revision_id(2, "", mapping))
+ self.assertEqual(repos.generate_revision_id(2, "", mapping),
+ self.svndir.open_branch().last_revision())
+ self.assertEqual("other data",
+ repos.revision_tree(repos.generate_revision_id(2, "",
+ mapping)).get_file_text(inv.path2id("foo/bla")))
+
+ def test_simple(self):
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr")
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ repos = self.svndir.find_repository()
+ mapping = repos.get_mapping()
+ inv = repos.get_inventory(repos.generate_revision_id(2, "", mapping))
+ self.assertTrue(inv.has_filename('file'))
+ self.assertEquals(wt.branch.last_revision(),
+ repos.generate_revision_id(2, "", mapping))
+ self.assertEqual(repos.generate_revision_id(2, "", mapping),
+ self.svndir.open_branch().last_revision())
+
+ def test_override_revprops(self):
+ self.svndir.find_repository().get_config().set_user_option("override-svn-revprops", "True")
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr", committer="Sombody famous", timestamp=1012604400, timezone=0)
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ self.assertEquals(("Sombody famous", "2002-02-01T23:00:00.000000Z", "Commit from Bzr"),
+ self.client_log(self.repos_url, 0, 2)[2][1:])
+
+ def test_empty_file(self):
+ self.build_tree({'dc/file': ''})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr")
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ repos = self.svndir.find_repository()
+ mapping = repos.get_mapping()
+ inv = repos.get_inventory(repos.generate_revision_id(2, "", mapping))
+ self.assertTrue(inv.has_filename('file'))
+ self.assertEquals(wt.branch.last_revision(),
+ repos.generate_revision_id(2, "", mapping))
+ self.assertEqual(repos.generate_revision_id(2, "", mapping),
+ self.svndir.open_branch().last_revision())
+
+ def test_symlink(self):
+ if not has_symlinks():
+ return
+ os.symlink("bla", "dc/south")
+ assert os.path.islink("dc/south")
+ wt = self.bzrdir.open_workingtree()
+ wt.add('south')
+ wt.commit(message="Commit from Bzr")
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ repos = self.svndir.find_repository()
+ mapping = repos.get_mapping()
+ inv = repos.get_inventory(repos.generate_revision_id(2, "", mapping))
+ self.assertTrue(inv.has_filename('south'))
+ self.assertEquals('symlink', inv[inv.path2id('south')].kind)
+ self.assertEquals('bla', inv[inv.path2id('south')].symlink_target)
+
+ def test_pull_after_push(self):
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr")
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ repos = self.svndir.find_repository()
+ mapping = repos.get_mapping()
+ inv = repos.get_inventory(repos.generate_revision_id(2, "", mapping))
+ self.assertTrue(inv.has_filename('file'))
+ self.assertEquals(wt.branch.last_revision(),
+ repos.generate_revision_id(2, "", mapping))
+ self.assertEqual(repos.generate_revision_id(2, "", mapping),
+ self.svndir.open_branch().last_revision())
+
+ self.bzrdir.open_branch().pull(self.svndir.open_branch())
+
+ self.assertEqual(repos.generate_revision_id(2, "", mapping),
+ self.bzrdir.open_branch().last_revision())
+
+ def test_branch_after_push(self):
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr")
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ os.mkdir("b")
+ repos = self.svndir.sprout("b")
+
+ self.assertEqual(Branch.open("dc").revision_history(),
+ Branch.open("b").revision_history())
+
+ def test_fetch_preserves_file_revid(self):
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ self.build_tree({'dc/foo/bla': 'data43243242'})
+ revid = wt.commit(message="Commit from Bzr")
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ os.mkdir("b")
+ repos = self.svndir.sprout("b")
+
+ b = Branch.open("b")
+
+ def check_tree_revids(rtree):
+ self.assertEqual(rtree.inventory[rtree.path2id("file")].revision,
+ revid)
+ self.assertEqual(rtree.inventory[rtree.path2id("foo")].revision,
+ b.revision_history()[1])
+ self.assertEqual(rtree.inventory[rtree.path2id("foo/bla")].revision,
+ revid)
+ self.assertEqual(rtree.inventory.root.revision, b.revision_history()[0])
+
+ check_tree_revids(wt.branch.repository.revision_tree(b.last_revision()))
+
+ check_tree_revids(b.repository.revision_tree(b.last_revision()))
+ bc = self.svndir.open_branch()
+ check_tree_revids(bc.repository.revision_tree(bc.last_revision()))
+
+ def test_message(self):
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr")
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ repos = self.svndir.find_repository()
+ mapping = repos.get_mapping()
+ self.assertEqual("Commit from Bzr",
+ repos.get_revision(repos.generate_revision_id(2, "", mapping)).message)
+
+ def test_commit_set_revid(self):
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr", rev_id="some-rid")
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ c = ra.RemoteAccess(self.repos_url)
+ self.assertEqual("3 some-rid\n",
+ c.get_dir("", c.get_latest_revnum())[2][SVN_PROP_BZR_REVISION_ID+"v3-none"])
+
+ def test_commit_check_rev_equal(self):
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr")
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ rev1 = self.svndir.find_repository().get_revision(wt.branch.last_revision())
+ rev2 = self.bzrdir.find_repository().get_revision(wt.branch.last_revision())
+
+ self.assertEqual(rev1.committer, rev2.committer)
+ self.assertEqual(rev1.timestamp, rev2.timestamp)
+ self.assertEqual(rev1.timezone, rev2.timezone)
+ self.assertEqual(rev1.properties, rev2.properties)
+ self.assertEqual(rev1.message, rev2.message)
+ self.assertEqual(rev1.revision_id, rev2.revision_id)
+
+ def test_multiple(self):
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr")
+
+ self.build_tree({'dc/file': 'data2', 'dc/adir': None})
+ wt.add('adir')
+ wt.commit(message="Another commit from Bzr")
+
+ self.svndir.open_branch().pull(self.bzrdir.open_branch())
+
+ repos = self.svndir.find_repository()
+
+ mapping = repos.get_mapping()
+
+ self.assertEqual(repos.generate_revision_id(3, "", mapping),
+ self.svndir.open_branch().last_revision())
+
+ inv = repos.get_inventory(repos.generate_revision_id(2, "", mapping))
+ self.assertTrue(inv.has_filename('file'))
+ self.assertFalse(inv.has_filename('adir'))
+
+ inv = repos.get_inventory(repos.generate_revision_id(3, "", mapping))
+ self.assertTrue(inv.has_filename('file'))
+ self.assertTrue(inv.has_filename('adir'))
+
+ self.assertEqual(self.svndir.open_branch().revision_history(),
+ self.bzrdir.open_branch().revision_history())
+
+ self.assertEqual(wt.branch.last_revision(),
+ repos.generate_revision_id(3, "", mapping))
+ self.assertEqual(
+ wt.branch.repository.get_ancestry(wt.branch.last_revision()),
+ repos.get_ancestry(wt.branch.last_revision()))
+
+ def test_multiple_diverged(self):
+ oc_url = self.make_repository("o")
+
+ self.build_tree({'dc/file': 'data'})
+ wt = self.bzrdir.open_workingtree()
+ wt.add('file')
+ wt.commit(message="Commit from Bzr")
+
+ oc = self.get_commit_editor(oc_url)
+ oc.add_file("file").modify("data2")
+ oc.add_dir("adir")
+ oc.close()
+
+ self.assertRaises(DivergedBranches,
+ lambda: Branch.open(oc_url).pull(self.bzrdir.open_branch()))
+
+ def test_different_branch_path(self):
+ # A ,> C
+ # \ -> B /
+ dc = self.commit_editor()
+ trunk = dc.add_dir("trunk")
+ trunk.add_file('trunk/foo').modify("data")
+ dc.add_dir("branches")
+ dc.close()
+
+ dc = self.commit_editor()
+ branches = dc.open_dir('branches')
+ mybranch = branches.add_dir('branches/mybranch', 'trunk')
+ mybranch.open_file("branches/mybranch/foo").modify('data2')
+ dc.close()
+
+ self.svndir = BzrDir.open("%s/branches/mybranch" % self.repos_url)
+ os.mkdir("mybranch")
+ self.bzrdir = self.svndir.sprout("mybranch")
+
+ self.build_tree({'mybranch/foo': 'bladata'})
+ wt = self.bzrdir.open_workingtree()
+ revid = wt.commit(message="Commit from Bzr")
+ b = Branch.open("%s/trunk" % self.repos_url)
+ push(b.repository.get_graph(), b, wt.branch.repository,
+ wt.branch.revision_history()[-2])
+ mutter('log %r' % self.client_log("%s/trunk" % self.repos_url, 0, 4)[4][0])
+ self.assertEquals('M',
+ self.client_log("%s/trunk" % self.repos_url, 0, 4)[4][0]['/trunk'][0])
+ b = Branch.open("%s/trunk" % self.repos_url)
+ push(b.repository.get_graph(), b, wt.branch.repository, wt.branch.last_revision())
+ mutter('log %r' % self.client_log("%s/trunk" % self.repos_url, 0, 5)[5][0])
+ self.assertEquals("/branches/mybranch",
+ self.client_log("%s/trunk" % self.repos_url, 0, 5)[5][0]['/trunk'][1])
+
+class PushNewBranchTests(SubversionTestCase):
+ def test_single_revision(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/test': "Tour"})
+ bzrwt.add("test")
+ revid = bzrwt.commit("Do a commit")
+ newdir = BzrDir.open("%s/trunk" % repos_url)
+ newbranch = newdir.import_branch(bzrwt.branch)
+ newtree = newbranch.repository.revision_tree(revid)
+ bzrwt.lock_read()
+ self.assertEquals(bzrwt.inventory.root.file_id,
+ newtree.inventory.root.file_id)
+ bzrwt.unlock()
+ self.assertEquals(revid, newbranch.last_revision())
+ self.assertEquals([revid], newbranch.revision_history())
+
+ def test_single_revision_single_branch(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/test': "Tour"})
+ bzrwt.add("test")
+ revid = bzrwt.commit("Do a commit")
+
+ dc = self.get_commit_editor(repos_url)
+ some = dc.add_dir("some")
+ funny = some.add_dir("some/funny")
+ funny.add_dir("some/funny/branch")
+ dc.close()
+ newdir = BzrDir.open("%s/some/funny/branch/name" % repos_url)
+ newbranch = newdir.import_branch(bzrwt.branch)
+ self.assertEquals(revid, newbranch.last_revision())
+
+ # revision graph for the two tests below:
+ # svn-1
+ # |
+ # base
+ # | \
+ # diver svn2
+ # | /
+ # merge
+
+ def test_push_replace_existing_root(self):
+ repos_url = self.make_client("test", "svnco")
+ self.build_tree({'svnco/foo.txt': 'foo'})
+ self.client_add("svnco/foo.txt")
+ self.client_commit("svnco", "add file") #1
+ self.client_update("svnco")
+
+ os.mkdir('bzrco')
+ dir = BzrDir.open(repos_url).sprout("bzrco")
+ wt = dir.open_workingtree()
+ self.build_tree({'bzrco/bar.txt': 'bar'})
+ wt.add("bar.txt")
+ base_revid = wt.commit("add another file", rev_id="mybase")
+ wt.branch.push(Branch.open(repos_url))
+
+ self.build_tree({"svnco/baz.txt": "baz"})
+ self.client_add("svnco/baz.txt")
+ self.assertEquals(3,
+ self.client_commit("svnco", "add yet another file")[0])
+ self.client_update("svnco")
+
+ self.build_tree({"bzrco/qux.txt": "qux"})
+ wt.add("qux.txt")
+ wt.commit("add still more files", rev_id="mydiver")
+
+ repos = Repository.open(repos_url)
+ wt.branch.repository.fetch(repos)
+ mapping = repos.get_mapping()
+ other_rev = repos.generate_revision_id(3, "", mapping)
+ wt.lock_write()
+ try:
+ merge = Merger.from_revision_ids(DummyProgress(), wt, other=other_rev)
+ merge.merge_type = Merge3Merger
+ merge.do_merge()
+ self.assertEquals(base_revid, merge.base_rev_id)
+ merge.set_pending()
+ self.assertEquals([wt.last_revision(), other_rev], wt.get_parent_ids())
+ wt.commit("merge", rev_id="mymerge")
+ finally:
+ wt.unlock()
+ self.assertTrue(os.path.exists("bzrco/baz.txt"))
+ self.assertRaises(BzrError,
+ lambda: wt.branch.push(Branch.open(repos_url)))
+
+ def test_push_replace_existing_branch(self):
+ repos_url = self.make_client("test", "svnco")
+ self.build_tree({'svnco/trunk/foo.txt': 'foo'})
+ self.client_add("svnco/trunk")
+ self.client_commit("svnco", "add file") #1
+ self.client_update("svnco")
+
+ os.mkdir('bzrco')
+ dir = BzrDir.open(repos_url+"/trunk").sprout("bzrco")
+ wt = dir.open_workingtree()
+ self.build_tree({'bzrco/bar.txt': 'bar'})
+ wt.add("bar.txt")
+ base_revid = wt.commit("add another file", rev_id="mybase")
+ wt.branch.push(Branch.open(repos_url+"/trunk"))
+
+ self.build_tree({"svnco/trunk/baz.txt": "baz"})
+ self.client_add("svnco/trunk/baz.txt")
+ self.assertEquals(3,
+ self.client_commit("svnco", "add yet another file")[0])
+ self.client_update("svnco")
+
+ self.build_tree({"bzrco/qux.txt": "qux"})
+ wt.add("qux.txt")
+ wt.commit("add still more files", rev_id="mydiver")
+
+ repos = Repository.open(repos_url)
+ wt.branch.repository.fetch(repos)
+ mapping = repos.get_mapping()
+ other_rev = repos.generate_revision_id(3, "trunk", mapping)
+ wt.lock_write()
+ try:
+ merge = Merger.from_revision_ids(DummyProgress(), wt, other=other_rev)
+ merge.merge_type = Merge3Merger
+ merge.do_merge()
+ self.assertEquals(base_revid, merge.base_rev_id)
+ merge.set_pending()
+ self.assertEquals([wt.last_revision(), other_rev], wt.get_parent_ids())
+ wt.commit("merge", rev_id="mymerge")
+ finally:
+ wt.unlock()
+ self.assertTrue(os.path.exists("bzrco/baz.txt"))
+ wt.branch.push(Branch.open(repos_url+"/trunk"))
+
+ def test_push_merge_unchanged_file(self):
+ def check_tree(t):
+ self.assertEquals(base_revid, t.inventory[t.path2id("bar.txt")].revision)
+ self.assertEquals(other_revid, t.inventory[t.path2id("bar2.txt")].revision)
+ repos_url = self.make_repository("test")
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir("trunk")
+ trunk.add_file("trunk/foo.txt").modify("add file")
+ dc.close()
+
+ os.mkdir('bzrco1')
+ dir1 = BzrDir.open(repos_url+"/trunk").sprout("bzrco1")
+
+ os.mkdir('bzrco2')
+ dir2 = BzrDir.open(repos_url+"/trunk").sprout("bzrco2")
+
+ wt1 = dir1.open_workingtree()
+ self.build_tree({'bzrco1/bar.txt': 'bar'})
+ wt1.add("bar.txt")
+ base_revid = wt1.commit("add another file", rev_id="mybase")
+ wt1.branch.push(Branch.open(repos_url+"/trunk"))
+
+ wt2 = dir2.open_workingtree()
+ self.build_tree({'bzrco2/bar2.txt': 'bar'})
+ wt2.add("bar2.txt")
+ other_revid = wt2.commit("add yet another file", rev_id="side1")
+
+ wt1.lock_write()
+ try:
+ wt1.merge_from_branch(wt2.branch)
+ self.assertEquals([wt1.last_revision(), other_revid], wt1.get_parent_ids())
+ mergingrevid = wt1.commit("merge", rev_id="side2")
+ check_tree(wt1.branch.repository.revision_tree(mergingrevid))
+ finally:
+ wt1.unlock()
+ self.assertTrue(os.path.exists("bzrco1/bar2.txt"))
+ wt1.branch.push(Branch.open(repos_url+"/trunk"))
+ r = Repository.open(repos_url)
+ revmeta = r._revmeta_provider.get_revision("trunk", 3)
+ self.assertEquals({"bar2.txt": "side1"}, revmeta.get_text_revisions(r.get_mapping()))
+
+ os.mkdir("cpy")
+ cpy = BzrDir.create("cpy", format.get_rich_root_format())
+ cpyrepos = cpy.create_repository()
+ r.copy_content_into(cpyrepos)
+ check_tree(cpyrepos.revision_tree(mergingrevid))
+
+ t = r.revision_tree(mergingrevid)
+ check_tree(t)
+
+ def test_missing_prefix_error(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/test': "Tour"})
+ bzrwt.add("test")
+ revid = bzrwt.commit("Do a commit")
+ newdir = BzrDir.open(repos_url+"/foo/trunk")
+ self.assertRaises(MissingPrefix,
+ lambda: newdir.import_branch(bzrwt.branch))
+
+ def test_repeat(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/test': "Tour"})
+ bzrwt.add("test")
+ revid = bzrwt.commit("Do a commit")
+ newdir = BzrDir.open(repos_url+"/trunk")
+ newbranch = newdir.import_branch(bzrwt.branch)
+ self.assertEquals(revid, newbranch.last_revision())
+ self.assertEquals([revid], newbranch.revision_history())
+ self.build_tree({'c/test': "Tour de France"})
+ bzrwt.commit("Do a commit")
+ newdir = BzrDir.open(repos_url+"/trunk")
+ self.assertRaises(AlreadyBranchError, newdir.import_branch,
+ bzrwt.branch)
+
+ def test_multiple(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/test': "Tour"})
+ bzrwt.add("test")
+ revid1 = bzrwt.commit("Do a commit")
+ self.build_tree({'c/test': "Tour de France"})
+ revid2 = bzrwt.commit("Do a commit")
+ newdir = BzrDir.open(repos_url+"/trunk")
+ newbranch = newdir.import_branch(bzrwt.branch)
+ self.assertEquals(revid2, newbranch.last_revision())
+ self.assertEquals([revid1, revid2], newbranch.revision_history())
+
+ def test_dato(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/foo.txt': "foo"})
+ bzrwt.add("foo.txt")
+ revid1 = bzrwt.commit("Do a commit",
+ committer=u"Adeodato Simó <dato@net.com.org.es>")
+ newdir = BzrDir.open(repos_url+"/trunk")
+ newdir.import_branch(bzrwt.branch)
+ self.assertEquals(u"Adeodato Simó <dato@net.com.org.es>",
+ Repository.open(repos_url).get_revision(revid1).committer)
+
+ def test_utf8_commit_msg(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/foo.txt': "foo"})
+ bzrwt.add("foo.txt")
+ revid1 = bzrwt.commit(u"Do á commït")
+ newdir = BzrDir.open(repos_url+"/trunk")
+ newdir.import_branch(bzrwt.branch)
+ self.assertEquals(u"Do á commït",
+ Repository.open(repos_url).get_revision(revid1).message)
+
+ def test_multiple_part_exists(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir('trunk')
+ trunk.add_file("trunk/myfile").modify("data")
+ dc.add_dir("branches")
+ dc.close()
+
+ svnrepos = Repository.open(repos_url)
+ os.mkdir("c")
+ bzrdir = BzrDir.open(repos_url+"/trunk").sprout("c")
+ bzrwt = bzrdir.open_workingtree()
+ self.build_tree({'c/myfile': "Tour"})
+ revid1 = bzrwt.commit("Do a commit")
+ self.build_tree({'c/myfile': "Tour de France"})
+ revid2 = bzrwt.commit("Do a commit")
+ newdir = BzrDir.open(repos_url+"/branches/mybranch")
+ newbranch = newdir.import_branch(bzrwt.branch)
+ self.assertEquals(revid2, newbranch.last_revision())
+ mapping = svnrepos.get_mapping()
+ self.assertEquals([
+ svnrepos.generate_revision_id(1, "trunk", mapping)
+ , revid1, revid2], newbranch.revision_history())
+
+ def test_push_overwrite(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir("trunk")
+ trunk.add_file("trunk/bloe").modify("text")
+ dc.close()
+
+ os.mkdir("d1")
+ bzrdir = BzrDir.open(repos_url+"/trunk").sprout("d1")
+ bzrwt1 = bzrdir.open_workingtree()
+
+ os.mkdir("d2")
+ bzrdir = BzrDir.open(repos_url+"/trunk").sprout("d2")
+ bzrwt2 = bzrdir.open_workingtree()
+
+ self.build_tree({'d1/myfile': "Tour"})
+ bzrwt1.add("myfile")
+ revid1 = bzrwt1.commit("Do a commit")
+
+ self.build_tree({'d2/myfile': "France"})
+ bzrwt2.add("myfile")
+ revid2 = bzrwt2.commit("Do a commit")
+
+ bzrwt1.branch.push(Branch.open(repos_url+"/trunk"))
+ self.assertEquals(bzrwt1.branch.revision_history(),
+ Branch.open(repos_url+"/trunk").revision_history())
+
+ bzrwt2.branch.push(Branch.open(repos_url+"/trunk"), overwrite=True)
+
+ self.assertEquals(bzrwt2.branch.revision_history(),
+ Branch.open(repos_url+"/trunk").revision_history())
+
+ def test_push_overwrite_unrelated(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir("trunk")
+ trunk.add_file("trunk/bloe").modify("text")
+ dc.close()
+
+ os.mkdir("d1")
+ bzrdir = BzrDir.open(repos_url+"/trunk").sprout("d1")
+ bzrwt1 = bzrdir.open_workingtree()
+
+ bzrwt2 = BzrDir.create_standalone_workingtree("d2",
+ format=format.get_rich_root_format())
+
+ self.build_tree({'d1/myfile': "Tour"})
+ bzrwt1.add("myfile")
+ revid1 = bzrwt1.commit("Do a commit")
+
+ self.build_tree({'d2/myfile': "France"})
+ bzrwt2.add("myfile")
+ revid2 = bzrwt2.commit("Do a commit")
+
+ bzrwt1.branch.push(Branch.open(repos_url+"/trunk"))
+ self.assertEquals(bzrwt1.branch.revision_history(),
+ Branch.open(repos_url+"/trunk").revision_history())
+
+ bzrwt2.branch.push(Branch.open(repos_url+"/trunk"), overwrite=True)
+
+ self.assertEquals(bzrwt2.branch.revision_history(),
+ Branch.open(repos_url+"/trunk").revision_history())
+
+
+
+ def test_complex_rename(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/registry/generic.c': "Tour"})
+ bzrwt.add("registry")
+ bzrwt.add("registry/generic.c")
+ revid1 = bzrwt.commit("Add initial directory + file")
+ bzrwt.rename_one("registry", "registry.moved")
+ os.unlink("c/registry.moved/generic.c")
+ bzrwt.remove("registry.moved/generic.c")
+ self.build_tree({'c/registry/generic.c': "bla"})
+ bzrwt.add("registry")
+ bzrwt.add("registry/generic.c")
+ revid2 = bzrwt.commit("Do some funky things")
+ newdir = BzrDir.open(repos_url+"/trunk")
+ newbranch = newdir.import_branch(bzrwt.branch)
+ self.assertEquals(revid2, newbranch.last_revision())
+ self.assertEquals([revid1, revid2], newbranch.revision_history())
+ tree = newbranch.repository.revision_tree(revid2)
+ mutter("inventory: %r" % tree.inventory.entries())
+ delta = tree.changes_from(bzrwt)
+ self.assertFalse(delta.has_changed())
+ self.assertTrue(tree.inventory.has_filename("registry"))
+ self.assertTrue(tree.inventory.has_filename("registry.moved"))
+ self.assertTrue(tree.inventory.has_filename("registry/generic.c"))
+ self.assertFalse(tree.inventory.has_filename("registry.moved/generic.c"))
+ os.mkdir("n")
+ BzrDir.open(repos_url+"/trunk").sprout("n")
+
+ def test_rename_dir_changing_contents(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/registry/generic.c': "Tour"})
+ bzrwt.add("registry", "dirid")
+ bzrwt.add("registry/generic.c", "origid")
+ revid1 = bzrwt.commit("Add initial directory + file")
+ bzrwt.rename_one("registry/generic.c", "registry/c.c")
+ self.build_tree({'c/registry/generic.c': "Tour2"})
+ bzrwt.add("registry/generic.c", "newid")
+ revid2 = bzrwt.commit("Other change")
+ bzrwt.rename_one("registry", "registry.moved")
+ revid3 = bzrwt.commit("Rename")
+ newdir = BzrDir.open(repos_url+"/trunk")
+ newbranch = newdir.import_branch(bzrwt.branch)
+ def check(b):
+ self.assertEquals([revid1, revid2, revid3], b.revision_history())
+ tree = b.repository.revision_tree(revid3)
+ self.assertEquals("origid", tree.path2id("registry.moved/c.c"))
+ self.assertEquals("newid", tree.path2id("registry.moved/generic.c"))
+ self.assertEquals("dirid", tree.path2id("registry.moved"))
+ check(newbranch)
+ os.mkdir("n")
+ BzrDir.open(repos_url+"/trunk").sprout("n")
+ copybranch = Branch.open("n")
+ check(copybranch)
+
+ def test_rename_dir(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/registry/generic.c': "Tour"})
+ bzrwt.add("registry", "dirid")
+ bzrwt.add("registry/generic.c", "origid")
+ revid1 = bzrwt.commit("Add initial directory + file")
+ bzrwt.rename_one("registry", "registry.moved")
+ revid2 = bzrwt.commit("Rename")
+ newdir = BzrDir.open(repos_url+"/trunk")
+ newbranch = newdir.import_branch(bzrwt.branch)
+ def check(b):
+ self.assertEquals([revid1, revid2], b.revision_history())
+ tree = b.repository.revision_tree(revid2)
+ self.assertEquals("origid", tree.path2id("registry.moved/generic.c"))
+ self.assertEquals("dirid", tree.path2id("registry.moved"))
+ check(newbranch)
+ os.mkdir("n")
+ BzrDir.open(repos_url+"/trunk").sprout("n")
+ copybranch = Branch.open("n")
+ check(copybranch)
+
+ def test_push_non_lhs_parent(self):
+ from bzrlib.debug import debug_flags
+ debug_flags.add("commit")
+ debug_flags.add("fetch")
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/registry/generic.c': "Tour"})
+ bzrwt.add("registry")
+ bzrwt.add("registry/generic.c")
+ revid1 = bzrwt.commit("Add initial directory + file",
+ rev_id="initialrevid")
+
+ # Push first branch into Subversion
+ newdir = BzrDir.open(repos_url+"/trunk")
+ mapping = newdir.find_repository().get_mapping()
+ newbranch = newdir.import_branch(bzrwt.branch)
+
+ # Should create dc/trunk
+
+ dc = self.get_commit_editor(repos_url)
+ branches = dc.add_dir("branches")
+ branches.add_dir('branches/foo', 'trunk')
+ dc.close()
+
+ dc = self.get_commit_editor(repos_url)
+ branches = dc.open_dir("branches")
+ foo = branches.open_dir("branches/foo")
+ registry = foo.open_dir("branches/foo/registry")
+ registry.open_file("branches/foo/registry/generic.c").modify("France")
+ dc.close()
+
+ r = ra.RemoteAccess(repos_url)
+ merge_revno = r.get_latest_revnum()
+ merge_revid = newdir.find_repository().generate_revision_id(merge_revno, "branches/foo", mapping)
+
+ self.build_tree({'c/registry/generic.c': "de"})
+ revid2 = bzrwt.commit("Change something", rev_id="changerevid")
+
+ # Merge
+ self.build_tree({'c/registry/generic.c': "France"})
+ bzrwt.add_pending_merge(merge_revid)
+ revid3 = bzrwt.commit("Merge something", rev_id="mergerevid")
+
+ trunk = Branch.open(repos_url + "/branches/foo")
+ trunk.pull(bzrwt.branch)
+
+ self.assertEquals([revid1, revid2, revid3], trunk.revision_history())
+ self.assertEquals(
+ '1 initialrevid\n2 changerevid\n3 mergerevid\n',
+ self.client_get_prop(repos_url+"/branches/foo", SVN_PROP_BZR_REVISION_ID+"v3-trunk0", r.get_latest_revnum()))
+
+ def test_complex_replace_dir(self):
+ repos_url = self.make_repository("a")
+ bzrwt = BzrDir.create_standalone_workingtree("c",
+ format=format.get_rich_root_format())
+ self.build_tree({'c/registry/generic.c': "Tour"})
+ bzrwt.add(["registry"], ["origdir"])
+ bzrwt.add(["registry/generic.c"], ["file"])
+ revid1 = bzrwt.commit("Add initial directory + file")
+
+ bzrwt.remove('registry/generic.c')
+ bzrwt.remove('registry')
+ bzrwt.add(["registry"], ["newdir"])
+ bzrwt.add(["registry/generic.c"], ["file"])
+ revid2 = bzrwt.commit("Do some funky things")
+
+ newdir = BzrDir.open(repos_url+"/trunk")
+ newbranch = newdir.import_branch(bzrwt.branch)
+ self.assertEquals(revid2, newbranch.last_revision())
+ self.assertEquals([revid1, revid2], newbranch.revision_history())
+
+ os.mkdir("n")
+ BzrDir.open(repos_url+"/trunk").sprout("n")
+
+
+
+class TestPushTwice(SubversionTestCase):
+ def test_push_twice(self):
+ # bug 208566
+ repos_url = self.make_repository('d')
+
+ dc = self.get_commit_editor(repos_url)
+ trunk = dc.add_dir("trunk")
+ foo = trunk.add_dir("trunk/foo")
+ foo.add_file("trunk/foo/bla").modify("data")
+ dc.add_dir("branches")
+ dc.close()
+
+ svndir = BzrDir.open(repos_url+"/trunk")
+ os.mkdir("dc")
+ bzrdir = svndir.sprout("dc")
+ wt = bzrdir.open_workingtree()
+ revid = wt.commit(message="Commit from Bzr")
+ expected_history = wt.branch.revision_history()
+
+ svndir1 = BzrDir.open(repos_url+"/branches/a")
+ svndir1.import_branch(wt.branch)
+ self.assertEquals(expected_history, svndir1.open_branch().revision_history())
+
+ svndir2 = BzrDir.open(repos_url+"/branches/b")
+ svndir2.import_branch(wt.branch)
+ self.assertEquals(expected_history, svndir2.open_branch().revision_history())
+
+ revid = wt.commit(message="Commit from Bzr")
+ expected_history = wt.branch.revision_history()
+
+ svndir1.open_branch().pull(wt.branch)
+ self.assertEquals(expected_history, svndir1.open_branch().revision_history())
+ svndir2.open_branch().pull(wt.branch)
+ self.assertEquals(expected_history, svndir2.open_branch().revision_history())
--- /dev/null
- from bzrlib.plugins.svn.errors import RebaseNotPresent
+# Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""Mapping upgrade tests."""
+
+from bzrlib.bzrdir import BzrDir
+from bzrlib.repository import Repository
+from bzrlib.tests import TestCase, TestSkipped
+
- from bzrlib.plugins.foreign.upgrade import (upgrade_repository, upgrade_branch,
- upgrade_workingtree, UpgradeChangesContent,
+from bzrlib.plugins.svn.format import get_rich_root_format
++from bzrlib.plugins.svn.mapping import mapping_registry
+from bzrlib.plugins.svn.mapping3 import BzrSvnMappingv3FileProps
+from bzrlib.plugins.svn.mapping3.scheme import TrunkBranchingScheme
+from bzrlib.plugins.svn.tests import SubversionTestCase
- upgrade_repository(newrepos, oldrepos, mapping_registry=mapping_registry, allow_changes=True)
-
++from bzrlib.plugins.svn.foreign.upgrade import (upgrade_repository, upgrade_branch,
++ upgrade_workingtree, UpgradeChangesContent, RebaseNotPresent,
+ create_upgraded_revid, generate_upgrade_map)
+
+class TestUpgradeChangesContent(TestCase):
+ def test_init(self):
+ x = UpgradeChangesContent("revisionx")
+ self.assertEqual("revisionx", x.revid)
+
+
+class ParserTests(TestCase):
+ def test_create_upgraded_revid_new(self):
+ self.assertEqual("bla-svn3-upgrade",
+ create_upgraded_revid("bla", "-svn3"))
+
+ def test_create_upgraded_revid_upgrade(self):
+ self.assertEqual("bla-svn3-upgrade",
+ create_upgraded_revid("bla-svn1-upgrade", "-svn3"))
+
+
+def skip_no_rebase(unbound):
+ def check_error(self, *args, **kwargs):
+ try:
+ return unbound(self, *args, **kwargs)
+ except RebaseNotPresent, e:
+ raise TestSkipped(e)
+ check_error.__doc__ = unbound.__doc__
+ check_error.__name__ = unbound.__name__
+ return check_error
+
+
+class UpgradeTests(SubversionTestCase):
+ @skip_no_rebase
+ def test_no_custom(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("a").modify("b")
+ dc.close()
+
+ oldrepos = Repository.open(repos_url)
+ dir = BzrDir.create("f", format=get_rich_root_format())
+ newrepos = dir.create_repository()
+ oldrepos.copy_content_into(newrepos)
+ dir.create_branch()
+ wt = dir.create_workingtree()
+ file("f/a","w").write("b")
+ wt.add("a")
+ wt.commit(message="data", rev_id="svn-v1:1@%s-" % oldrepos.uuid)
+
+ self.assertTrue(newrepos.has_revision("svn-v1:1@%s-" % oldrepos.uuid))
+
- upgrade_repository(newrepos, oldrepos, mapping_registry=mapping_registry, allow_changes=True)
-
+ mapping = oldrepos.get_mapping()
++ upgrade_repository(newrepos, oldrepos, new_mapping=mapping, mapping_registry=mapping_registry, allow_changes=True)
++
+ self.assertTrue(newrepos.has_revision(oldrepos.generate_revision_id(1, "", mapping)))
+
+ @skip_no_rebase
+ def test_single_custom(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("a").modify("b")
+ dc.close()
+
+ oldrepos = Repository.open(repos_url)
+ dir = BzrDir.create("f", format=get_rich_root_format())
+ newrepos = dir.create_repository()
+ oldrepos.copy_content_into(newrepos)
+ dir.create_branch()
+ wt = dir.create_workingtree()
+ file("f/a", "w").write("b")
+ wt.add("a")
+ wt.commit(message="data", rev_id="svn-v1:1@%s-" % oldrepos.uuid)
+ file("f/a", 'w').write("moredata")
+ wt.commit(message='fix moredata', rev_id="customrev")
+
- upgrade_repository(newrepos, oldrepos, mapping_registry=mapping_registry, allow_changes=True)
+ mapping = oldrepos.get_mapping()
++ upgrade_repository(newrepos, oldrepos, new_mapping=mapping, mapping_registry=mapping_registry, allow_changes=True)
++
+ self.assertTrue(newrepos.has_revision(oldrepos.generate_revision_id(1, "", mapping)))
+ self.assertTrue(newrepos.has_revision("customrev%s-upgrade" % mapping.upgrade_suffix))
+ newrepos.lock_read()
+ self.assertTrue((oldrepos.generate_revision_id(1, "", mapping),),
+ tuple(newrepos.get_revision("customrev%s-upgrade" % mapping.upgrade_suffix).parent_ids))
+ newrepos.unlock()
+
+ @skip_no_rebase
+ def test_single_keep_parent_fileid(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("a").modify("b")
+ dc.close()
+
+ oldrepos = Repository.open(repos_url)
+ dir = BzrDir.create("f", format=get_rich_root_format())
+ newrepos = dir.create_repository()
+ oldrepos.copy_content_into(newrepos)
+ dir.create_branch()
+ wt = dir.create_workingtree()
+ file("f/a", "w").write("b")
+ wt.add(["a"], ["someid"])
+ wt.commit(message="data", rev_id="svn-v1:1@%s-" % oldrepos.uuid)
+ wt.rename_one("a", "b")
+ file("f/a", 'w').write("moredata")
+ wt.add(["a"], ["specificid"])
+ wt.commit(message='fix moredata', rev_id="customrev")
+
- upgrade_repository(newrepos, oldrepos, mapping_registry=mapping_registry,
+ mapping = oldrepos.get_mapping()
++ upgrade_repository(newrepos, oldrepos, new_mapping=mapping, mapping_registry=mapping_registry, allow_changes=True)
+ tree = newrepos.revision_tree("customrev%s-upgrade" % mapping.upgrade_suffix)
+ self.assertEqual("specificid", tree.inventory.path2id("a"))
+ self.assertEqual(mapping.generate_file_id(oldrepos.uuid, 1, "", u"a"),
+ tree.inventory.path2id("b"))
+
+ @skip_no_rebase
+ def test_single_custom_continue(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("a").modify("b")
+ dc.add_file("b").modify("c")
+ dc.close()
+
+ oldrepos = Repository.open(repos_url)
+ dir = BzrDir.create("f", format=get_rich_root_format())
+ newrepos = dir.create_repository()
+ oldrepos.copy_content_into(newrepos)
+ dir.create_branch()
+ wt = dir.create_workingtree()
+ file("f/a", "w").write("b")
+ file("f/b", "w").write("c")
+ wt.add("a")
+ wt.add("b")
+ wt.commit(message="data", rev_id="svn-v1:1@%s-" % oldrepos.uuid)
+ file("f/a", 'w').write("moredata")
+ file("f/b", 'w').write("moredata")
+ wt.commit(message='fix moredata', rev_id="customrev")
+
+ tree = newrepos.revision_tree("svn-v1:1@%s-" % oldrepos.uuid)
+
+ newrepos.lock_write()
+ newrepos.start_write_group()
+
+ mapping = oldrepos.get_mapping()
+ fileid = tree.inventory.path2id("a")
+ revid = "customrev%s-upgrade" % mapping.upgrade_suffix
+ newrepos.texts.add_lines((fileid, revid),
+ [(fileid, "svn-v1:1@%s-" % oldrepos.uuid)],
+ tree.get_file(fileid).readlines())
+
+ newrepos.commit_write_group()
+ newrepos.unlock()
+
- renames = upgrade_repository(newrepos, oldrepos, mapping_registry=mapping_registry,
++ upgrade_repository(newrepos, oldrepos, new_mapping=mapping, mapping_registry=mapping_registry,
+ allow_changes=True)
+
+ self.assertTrue(newrepos.has_revision(oldrepos.generate_revision_id(1, "", mapping)))
+ self.assertTrue(newrepos.has_revision("customrev%s-upgrade" % mapping.upgrade_suffix))
+ newrepos.lock_read()
+ self.assertTrue((oldrepos.generate_revision_id(1, "", mapping),),
+ tuple(newrepos.get_revision("customrev%s-upgrade" % mapping.upgrade_suffix).parent_ids))
+ newrepos.unlock()
+
+ @skip_no_rebase
+ def test_more_custom(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("a").modify("b")
+ dc.close()
+
+ oldrepos = Repository.open(repos_url)
+ dir = BzrDir.create("f", format=get_rich_root_format())
+ newrepos = dir.create_repository()
+ dir.create_branch()
+ wt = dir.create_workingtree()
+ file("f/a", "w").write("b")
+ wt.add("a")
+ wt.commit(message="data", rev_id="svn-v1:1@%s-" % oldrepos.uuid)
+ file("f/a", 'w').write("moredata")
+ wt.commit(message='fix moredata', rev_id="customrev")
+ file("f/a", 'w').write("blackfield")
+ wt.commit(message='fix it again', rev_id="anotherrev")
+
+ mapping = oldrepos.get_mapping()
- upgrade_branch(b, oldrepos, mapping_registry=mapping_registry, allow_changes=True)
++ renames = upgrade_repository(newrepos, oldrepos, new_mapping=mapping, mapping_registry=mapping_registry,
+ allow_changes=True)
+ self.assertEqual({
+ 'svn-v1:1@%s-' % oldrepos.uuid: 'svn-v3-none:%s::1' % oldrepos.uuid,
+ "customrev": "customrev%s-upgrade" % mapping.upgrade_suffix,
+ "anotherrev": "anotherrev%s-upgrade" % mapping.upgrade_suffix},
+ renames)
+
+ self.assertTrue(newrepos.has_revision(oldrepos.generate_revision_id(1, "", mapping)))
+ self.assertTrue(newrepos.has_revision("customrev%s-upgrade" % mapping.upgrade_suffix))
+ self.assertTrue(newrepos.has_revision("anotherrev%s-upgrade" % mapping.upgrade_suffix))
+ newrepos.lock_read()
+ self.assertTrue((oldrepos.generate_revision_id(1, "", mapping),),
+ tuple(newrepos.get_revision("customrev%s-upgrade" % mapping.upgrade_suffix).parent_ids))
+ self.assertTrue(("customrev-%s-upgrade" % mapping.upgrade_suffix,),
+ tuple(newrepos.get_revision("anotherrev%s-upgrade" % mapping.upgrade_suffix).parent_ids))
+ newrepos.unlock()
+
+ @skip_no_rebase
+ def test_more_custom_branch(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("a").modify("b")
+ dc.close()
+
+ oldrepos = Repository.open(repos_url)
+ dir = BzrDir.create("f", format=get_rich_root_format())
+ newrepos = dir.create_repository()
+ b = dir.create_branch()
+ wt = dir.create_workingtree()
+ file("f/a", "w").write("b")
+ wt.add("a")
+ wt.commit(message="data", rev_id="svn-v1:1@%s-" % oldrepos.uuid)
+ file("f/a", 'w').write("moredata")
+ wt.commit(message='fix moredata', rev_id="customrev")
+ file("f/a", 'w').write("blackfield")
+ wt.commit(message='fix it again', rev_id="anotherrev")
+
- upgrade_workingtree(wt, oldrepos, mapping_registry, allow_changes=True)
++ upgrade_branch(b, oldrepos, new_mapping=oldrepos.get_mapping(), mapping_registry=mapping_registry, allow_changes=True)
+ mapping = oldrepos.get_mapping()
+ self.assertEqual([oldrepos.generate_revision_id(0, "", mapping),
+ oldrepos.generate_revision_id(1, "", mapping),
+ "customrev%s-upgrade" % mapping.upgrade_suffix,
+ "anotherrev%s-upgrade" % mapping.upgrade_suffix
+ ], b.revision_history())
+
+ @skip_no_rebase
+ def test_workingtree(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("a").modify("b")
+ dc.close()
+
+ oldrepos = Repository.open(repos_url)
+ dir = BzrDir.create("f", format=get_rich_root_format())
+ newrepos = dir.create_repository()
+ b = dir.create_branch()
+ wt = dir.create_workingtree()
+ file("f/a", "w").write("b")
+ wt.add("a")
+ wt.commit(message="data", rev_id="svn-v1:1@%s-" % oldrepos.uuid)
+ file("f/a", 'w').write("moredata")
+ wt.commit(message='fix moredata', rev_id="customrev")
+ file("f/a", 'w').write("blackfield")
+ wt.commit(message='fix it again', rev_id="anotherrev")
+
+ mapping = oldrepos.get_mapping()
- upgrade_branch(b, oldrepos, mapping_registry=mapping_registry)
++ upgrade_workingtree(wt, oldrepos, new_mapping=mapping,
++ mapping_registry=mapping_registry, allow_changes=True)
+ self.assertEquals(wt.last_revision(), b.last_revision())
+ self.assertEqual([oldrepos.generate_revision_id(0, "", mapping),
+ oldrepos.generate_revision_id(1, "", mapping),
+ "customrev%s-upgrade" % mapping.upgrade_suffix,
+ "anotherrev%s-upgrade" % mapping.upgrade_suffix
+ ], b.revision_history())
+
+ @skip_no_rebase
+ def test_branch_none(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("a").modify("b")
+ dc.close()
+
+ oldrepos = Repository.open(repos_url)
+ dir = BzrDir.create("f", format=get_rich_root_format())
+ dir.create_repository()
+ b = dir.create_branch()
+ wt = dir.create_workingtree()
+ file("f/a", "w").write("b")
+ wt.add("a")
+ wt.commit(message="data", rev_id="blarev")
+ file("f/a", 'w').write("moredata")
+ wt.commit(message='fix moredata', rev_id="customrev")
+ file("f/a", 'w').write("blackfield")
+ wt.commit(message='fix it again', rev_id="anotherrev")
+
- self.assertRaises(UpgradeChangesContent, lambda: upgrade_branch(b, oldrepos, mapping_registry=mapping_registry))
++ upgrade_branch(b, oldrepos, new_mapping=oldrepos.get_mapping(), mapping_registry=mapping_registry)
+ self.assertEqual(["blarev", "customrev", "anotherrev"],
+ b.revision_history())
+
+ @skip_no_rebase
+ def test_raise_incompat(self):
+ repos_url = self.make_repository("a")
+
+ dc = self.get_commit_editor(repos_url)
+ dc.add_file("d").modify("e")
+ dc.close()
+
+ oldrepos = Repository.open(repos_url)
+ dir = BzrDir.create("f", format=get_rich_root_format())
+ dir.create_repository()
+ b = dir.create_branch()
+ wt = dir.create_workingtree()
+ file("f/a", "w").write("c")
+ wt.add("a")
+ wt.commit(message="data", rev_id="svn-v1:1@%s-" % oldrepos.uuid)
+
++ self.assertRaises(UpgradeChangesContent, lambda: upgrade_branch(b, oldrepos, new_mapping=oldrepos.get_mapping(), mapping_registry=mapping_registry))
+
+
+class TestGenerateUpdateMapTests(TestCase):
+ def test_nothing(self):
+ self.assertEquals({}, generate_upgrade_map(BzrSvnMappingv3FileProps(TrunkBranchingScheme()), ["bla", "bloe"]))
+
+ def test_v2_to_v3(self):
+ self.assertEquals({"svn-v2:12@65390229-12b7-0310-b90b-f21a5aa7ec8e-trunk": "svn-v3-trunk0:65390229-12b7-0310-b90b-f21a5aa7ec8e:trunk:12"}, generate_upgrade_map(BzrSvnMappingv3FileProps(TrunkBranchingScheme()), ["svn-v2:12@65390229-12b7-0310-b90b-f21a5aa7ec8e-trunk", "bloe", "blaaa"]))