return svn.core.svn_auth_open(providers, pool)
+def create_svn_client(pool):
+ client = svn.client.create_context(pool)
+ client.auth_baton = _create_auth_baton(pool)
+ client.config = svn_config
+ return client
+
+
# Don't run any tests on SvnTransport as it is not intended to be
# a full implementation of Transport
def get_test_permutations():
*args, **kwargs)
@convert_svn_error
- def change_dir_prop(self, baton, *args, **kwargs):
+ def change_dir_prop(self, baton, name, value, pool=None):
assert self.recent_baton[-1] == baton
return svn.delta.editor_invoke_change_dir_prop(self.editor, baton,
- *args, **kwargs)
+ name, value, pool)
@convert_svn_error
def delete_entry(self, *args, **kwargs):
return baton
@convert_svn_error
- def change_file_prop(self, baton, *args, **kwargs):
+ def change_file_prop(self, baton, name, value, pool=None):
assert self.recent_baton[-1] == baton
- svn.delta.editor_invoke_change_file_prop(self.editor, baton, *args,
- **kwargs)
+ svn.delta.editor_invoke_change_file_prop(self.editor, baton, name,
+ value, pool)
@convert_svn_error
def close_file(self, baton, *args, **kwargs):
self._backing_url = _backing_url.rstrip("/")
Transport.__init__(self, bzr_url)
- self._client = svn.client.create_context(self.pool)
- self._client.auth_baton = _create_auth_baton(self.pool)
- self._client.config = svn_config
-
+ self._client = create_svn_client(self.pool)
try:
self.mutter('opening SVN RA connection to %r' % self._backing_url)
self._ra = svn.client.open_ra_session(self._backing_url.encode('utf8'),
self.mutter('svn get-uuid')
return svn.ra.get_uuid(self._ra)
+ def get_repos_root(self):
+ root = self.get_svn_repos_root()
+ if (self.base.startswith("svn+http:") or
+ self.base.startswith("svn+https:")):
+ return "svn+%s" % root
+ return root
+
@convert_svn_error
@needs_busy
- def get_repos_root(self):
+ def get_svn_repos_root(self):
if self._root is None:
self.mutter("svn get-repos-root")
self._root = svn.ra.get_repos_root(self._ra)
self.mutter("svn get-latest-revnum")
return svn.ra.get_latest_revnum(self._ra)
+ def _make_editor(self, editor, pool=None):
+ edit, edit_baton = svn.delta.make_editor(editor, pool)
+ self._edit = edit
+ self._edit_baton = edit_baton
+ return self._edit, self._edit_baton
+
@convert_svn_error
- def do_switch(self, switch_rev, recurse, switch_url, *args, **kwargs):
- assert self._backing_url == self.svn_url, "backing url invalid: %r != %r" % (self._backing_url, self.svn_url)
+ def do_switch(self, switch_rev, recurse, switch_url, editor, pool=None):
+ self._open_real_transport()
self.mutter('svn switch -r %d -> %r' % (switch_rev, switch_url))
self._mark_busy()
- return self.Reporter(self, svn.ra.do_switch(self._ra, switch_rev, "", recurse, switch_url, *args, **kwargs))
+ edit, edit_baton = self._make_editor(editor, pool)
+ return self.Reporter(self, svn.ra.do_switch(self._ra, switch_rev, "",
+ recurse, switch_url, edit, edit_baton, pool))
@convert_svn_error
@needs_busy
def get_log(self, path, from_revnum, to_revnum, *args, **kwargs):
self.mutter('svn log %r:%r %r' % (from_revnum, to_revnum, path))
- return svn.ra.get_log(self._ra, [self._request_path(path)], from_revnum, to_revnum, *args, **kwargs)
+ return svn.ra.get_log(self._ra, [self._request_path(path)],
+ from_revnum, to_revnum, *args, **kwargs)
+
+ def _open_real_transport(self):
+ if self._backing_url != self.svn_url:
+ self.reparent(self.base)
+ assert self._backing_url == self.svn_url
def reparent_root(self):
if self._is_http_transport():
- self.svn_url = self.base = self.get_repos_root()
+ self.svn_url = self.get_svn_repos_root()
+ self.base = self.get_repos_root()
else:
self.reparent(self.get_repos_root())
+ @convert_svn_error
+ def change_rev_prop(self, revnum, name, value, pool=None):
+ svn.ra.change_rev_prop(self._ra, revnum, name, value)
+
@convert_svn_error
@needs_busy
def reparent(self, url):
url = url.rstrip("/")
- if url == self.svn_url:
- return
self.base = url
- self.svn_url = url
- self._backing_url = url
+ self.svn_url = bzr_to_svn_url(url)
+ if self.svn_url == self._backing_url:
+ return
if hasattr(svn.ra, 'reparent'):
self.mutter('svn reparent %r' % url)
- svn.ra.reparent(self._ra, url, self.pool)
+ svn.ra.reparent(self._ra, self.svn_url, self.pool)
else:
self.mutter('svn reparent (reconnect) %r' % url)
self._ra = svn.client.open_ra_session(self.svn_url.encode('utf8'),
self._client, self.pool)
+ self._backing_url = self.svn_url
@convert_svn_error
@needs_busy
def get_dir(self, path, revnum, pool=None, kind=False):
self.mutter("svn ls -r %d '%r'" % (revnum, path))
+ assert len(path) == 0 or path[0] != "/"
path = self._request_path(path)
# ra_dav backends fail with strange errors if the path starts with a
# slash while other backends don't.
- assert len(path) == 0 or path[0] != "/"
if hasattr(svn.ra, 'get_dir2'):
fields = 0
if kind:
return svn.ra.get_dir(self._ra, path, revnum)
def _request_path(self, relpath):
- if self._backing_url != self.svn_url:
- relpath = urlutils.join(
- urlutils.relative_url(self._backing_url, self.svn_url),
- relpath)
- relpath = relpath.rstrip("/")
- return relpath
+ if self._backing_url == self.svn_url:
+ return relpath
+ newrelpath = urlutils.join(
+ urlutils.relative_url(self._backing_url+"/", self.svn_url+"/"),
+ relpath).rstrip("/")
+ self.mutter('request path %r -> %r' % (relpath, newrelpath))
+ return newrelpath
@convert_svn_error
def list_dir(self, relpath):
@convert_svn_error
@needs_busy
def check_path(self, path, revnum, *args, **kwargs):
- path = self._request_path(path)
assert len(path) == 0 or path[0] != "/"
+ path = self._request_path(path)
self.mutter("svn check_path -r%d %s" % (revnum, path))
return svn.ra.check_path(self._ra, path.encode('utf-8'), revnum, *args, **kwargs)
raise
@convert_svn_error
- def do_update(self, revnum, *args, **kwargs):
- assert self._backing_url == self.svn_url, "backing url invalid: %r != %r" % (self._backing_url, self.svn_url)
+ def replay(self, revision, low_water_mark, send_deltas, editor, pool=None):
+ self._open_real_transport()
+ self.mutter('svn replay -r%r:%r' % (low_water_mark, revision))
+ self._mark_busy()
+ edit, edit_baton = self._make_editor(editor, pool)
+ svn.ra.replay(self._ra, revision, low_water_mark, send_deltas,
+ edit, edit_baton, pool)
+
+ @convert_svn_error
+ def do_update(self, revnum, recurse, editor, pool=None):
+ self._open_real_transport()
self.mutter('svn update -r %r' % revnum)
self._mark_busy()
- return self.Reporter(self, svn.ra.do_update(self._ra, revnum, "", *args, **kwargs))
+ edit, edit_baton = self._make_editor(editor, pool)
+ return self.Reporter(self, svn.ra.do_update(self._ra, revnum, "",
+ recurse, edit, edit_baton, pool))
+
+ def supports_custom_revprops(self):
+ return has_attr(svn.ra, 'get_commit_editor3')
@convert_svn_error
- def get_commit_editor(self, *args, **kwargs):
- assert self._backing_url == self.svn_url, "backing url invalid: %r != %r" % (self._backing_url, self.svn_url)
+ def get_commit_editor(self, revprops, done_cb, lock_token, keep_locks):
+ self._open_real_transport()
self._mark_busy()
- return Editor(self, svn.ra.get_commit_editor(self._ra, *args, **kwargs))
+ if revprops.keys() == [svn.core.SVN_PROP_REVISION_LOG]:
+ editor = svn.ra.get_commit_editor(self._ra,
+ revprops[svn.core.SVN_PROP_REVISION_LOG],
+ done_cb, lock_token, keep_locks)
+ else:
+ editor = svn.ra.get_commit_editor3(self._ra, revprops, done_cb,
+ lock_token, keep_locks)
+ return Editor(self, editor)
def listable(self):
"""See Transport.listable().