1 # Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@jelmer.uk>
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published by
5 # the Free Software Foundation; either version 2.1 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Lesser General Public License for more details.
13 # You should have received a copy of the GNU Lesser General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 """Subversion ra library tests."""
18 from io import BytesIO
20 from subvertpy import (
21 NODE_DIR, NODE_NONE, NODE_UNKNOWN,
25 from subvertpy.tests import (
30 class VersionTest(TestCase):
32 def test_version_length(self):
33 self.assertEqual(4, len(ra.version()))
35 def test_api_version_length(self):
36 self.assertEqual(4, len(ra.api_version()))
38 def test_api_version_later_same(self):
39 self.assertTrue(ra.api_version() <= ra.version())
42 class TestRemoteAccessUnknown(TestCase):
44 def test_unknown_url(self):
45 self.assertRaises(SubversionException, ra.RemoteAccess, "bla://")
48 class TestRemoteAccess(SubversionTestCase):
51 super(TestRemoteAccess, self).setUp()
52 self.repos_url = self.make_repository("d")
53 self.ra = ra.RemoteAccess(self.repos_url,
54 auth=ra.Auth([ra.get_username_provider()]))
58 super(TestRemoteAccess, self).tearDown()
60 def commit_editor(self):
61 return self.get_commit_editor(self.repos_url)
64 dc = self.get_commit_editor(self.repos_url)
69 self.assertEqual("RemoteAccess(\"%s\")" % self.repos_url,
72 def test_latest_revnum(self):
73 self.assertEqual(0, self.ra.get_latest_revnum())
75 def test_latest_revnum_one(self):
77 self.assertEqual(1, self.ra.get_latest_revnum())
79 def test_get_uuid(self):
80 self.assertEqual(36, len(self.ra.get_uuid()))
82 def test_get_repos_root(self):
83 self.assertEqual(self.repos_url, self.ra.get_repos_root())
85 def test_get_url(self):
86 if ra.api_version() < (1, 5):
87 self.assertRaises(NotImplementedError, self.ra.get_url)
89 self.assertEqual(self.repos_url, self.ra.get_url())
91 def test_reparent(self):
92 self.ra.reparent(self.repos_url)
94 def test_has_capability(self):
95 if ra.api_version() < (1, 5):
96 self.assertRaises(NotImplementedError, self.ra.has_capability, "FOO")
98 self.assertRaises(SubversionException, self.ra.has_capability, "FOO")
100 def test_get_dir(self):
101 ret = self.ra.get_dir("", 0)
102 self.assertIsInstance(ret, tuple)
104 def test_get_dir_leading_slash(self):
105 ret = self.ra.get_dir("/", 0)
106 self.assertIsInstance(ret, tuple)
108 def test_get_dir_kind(self):
110 (dirents, fetch_rev, props) = self.ra.get_dir("/", 1, fields=ra.DIRENT_KIND)
111 self.assertIsInstance(props, dict)
112 self.assertEqual(1, fetch_rev)
113 self.assertEqual(NODE_DIR, dirents["foo"]["kind"])
115 def test_change_rev_prop(self):
117 self.ra.change_rev_prop(1, "foo", "bar")
119 def test_rev_proplist(self):
120 self.assertIsInstance(self.ra.rev_proplist(0), dict)
122 def test_do_diff(self):
126 def change_prop(self, name, val): pass
127 def close(self, checksum=None): pass
130 def change_prop(self, name, val): pass
131 def add_directory(self, *args): return MyDirEditor()
132 def add_file(self, *args): return MyFileEditor()
133 def close(self): pass
136 def set_target_revision(self, rev): pass
137 def open_root(self, base_rev):
139 def close(self): pass
140 reporter = self.ra.do_diff(1, "", self.ra.get_repos_root(), MyEditor())
141 reporter.set_path("", 0, True)
143 self.assertRaises(RuntimeError, reporter.finish)
144 self.assertRaises(RuntimeError, reporter.set_path, "", 0, True)
146 def test_iter_log_invalid(self):
147 self.assertRaises(SubversionException, list, self.ra.iter_log(
148 ["idontexist"], 0, 0, revprops=["svn:date", "svn:author", "svn:log"]))
149 self.assertRaises(SubversionException, list, self.ra.iter_log(
150 [""], 0, 1000, revprops=["svn:date", "svn:author", "svn:log"]))
152 def test_iter_log(self):
153 def check_results(returned):
154 self.assertEqual(2, len(returned))
155 self.assertTrue(len(returned[0]) in (3,4))
156 if len(returned[0]) == 3:
157 (paths, revnum, props) = returned[0]
159 (paths, revnum, props, has_children) = returned[0]
160 self.assertEqual(None, paths)
161 self.assertEqual(revnum, 0)
162 self.assertEqual(["svn:date"], list(props.keys()))
163 if len(returned[1]) == 3:
164 (paths, revnum, props) = returned[1]
166 (paths, revnum, props, has_children) = returned[1]
167 if ra.api_version() < (1, 6):
168 self.assertEqual({'/foo': ('A', None, -1, NODE_UNKNOWN)}, paths)
170 self.assertEqual({'/foo': ('A', None, -1, NODE_DIR)}, paths)
171 self.assertEqual(revnum, 1)
172 self.assertEqual(set(["svn:date", "svn:author", "svn:log"]),
174 returned = list(self.ra.iter_log([""], 0, 0,
175 revprops=["svn:date", "svn:author", "svn:log"]))
176 self.assertEqual(1, len(returned))
178 returned = list(self.ra.iter_log(None, 0, 1, discover_changed_paths=True,
179 strict_node_history=False, revprops=["svn:date", "svn:author", "svn:log"]))
180 check_results(returned)
182 def test_get_log(self):
185 returned.append(args)
186 def check_results(returned):
187 self.assertEqual(2, len(returned))
188 self.assertTrue(len(returned[0]) in (3,4))
189 if len(returned[0]) == 3:
190 (paths, revnum, props) = returned[0]
192 (paths, revnum, props, has_children) = returned[0]
193 self.assertEqual(None, paths)
194 self.assertEqual(revnum, 0)
195 self.assertEqual(["svn:date"], list(props.keys()))
196 if len(returned[1]) == 3:
197 (paths, revnum, props) = returned[1]
199 (paths, revnum, props, has_children) = returned[1]
200 self.assertEqual({'/foo': ('A', None, -1)}, paths)
201 self.assertEqual(revnum, 1)
202 self.assertEqual(set(["svn:date", "svn:author", "svn:log"]),
204 self.ra.get_log(cb, [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"])
205 self.assertEqual(1, len(returned))
208 self.ra.get_log(cb, None, 0, 1, discover_changed_paths=True,
209 strict_node_history=False, revprops=["svn:date", "svn:author", "svn:log"])
210 check_results(returned)
212 def test_get_log_cancel(self):
216 self.assertRaises(KeyError,
217 self.ra.get_log, cb, [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"])
219 def test_get_commit_editor_double_close(self):
222 editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
223 dir = editor.open_root()
225 self.assertRaises(RuntimeError, dir.close)
227 self.assertRaises(RuntimeError, editor.close)
228 self.assertRaises(RuntimeError, editor.abort)
230 def test_get_commit_editor_busy(self):
233 editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
234 self.assertRaises(ra.BusyException, self.ra.get_commit_editor,
235 {"svn:log": "foo"}, mycb)
238 def test_get_commit_editor_double_open(self):
241 editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
242 root = editor.open_root()
243 root.add_directory("somedir")
244 self.assertRaises(RuntimeError, root.add_directory, "foo")
246 def test_get_commit_editor_custom_revprops(self):
247 if ra.version()[:2] < (1,5):
249 def mycb(paths, rev, revprops):
251 editor = self.ra.get_commit_editor({"svn:log": "foo", "bar:foo": "bla", "svn:custom:blie": "bloe"}, mycb)
252 root = editor.open_root()
253 root.add_directory("somedir").close()
257 revprops = self.ra.rev_proplist(1)
259 set(['bar:foo', 'svn:author', 'svn:custom:blie', 'svn:date', 'svn:log']),
260 set(revprops.keys()), "result: %r" % revprops)
262 def test_get_commit_editor_context_manager(self):
263 def mycb(paths, rev, revprops):
265 editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
266 self.assertIs(editor, editor.__enter__())
267 dir = editor.open_root(0)
268 subdir = dir.add_directory("foo")
269 self.assertIs(subdir, subdir.__enter__())
270 subdir.__exit__(None, None, None)
271 dir.__exit__(None, None, None)
272 editor.__exit__(None, None, None)
274 def test_get_commit_editor(self):
275 def mycb(paths, rev, revprops):
277 editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
278 dir = editor.open_root(0)
279 subdir = dir.add_directory("foo")
284 def test_commit_file_props(self):
285 cb = self.commit_editor()
286 f = cb.add_file("bar")
288 f.change_prop("bla:bar", "blie")
291 cb = self.commit_editor()
292 f = cb.open_file("bar")
293 f.change_prop("bla:bar", None)
297 props = self.ra.get_file("bar", stream, 1)[1]
298 self.assertEqual(b"blie", props.get("bla:bar"))
300 props = self.ra.get_file("bar", stream, 2)[1]
301 self.assertIs(None, props.get("bla:bar"))
303 def test_get_file_revs(self):
304 cb = self.commit_editor()
305 cb.add_file("bar").modify(b"a")
308 cb = self.commit_editor()
309 f = cb.open_file("bar")
311 f.change_prop("bla", "bloe")
316 def handle(path, rev, props, from_merge=None):
317 rets.append((path, rev, props))
319 self.ra.get_file_revs("bar", 1, 2, handle)
321 self.assertEqual(2, len(rets))
322 self.assertEqual(1, rets[0][1])
323 self.assertEqual(2, rets[1][1])
324 self.assertEqual("/bar", rets[0][0])
325 self.assertEqual("/bar", rets[1][0])
327 def test_get_file(self):
328 cb = self.commit_editor()
329 cb.add_file("bar").modify(b"a")
333 self.ra.get_file("bar", stream, 1)
335 self.assertEqual(b"a", stream.read())
338 self.ra.get_file("/bar", stream, 1)
340 self.assertEqual(b"a", stream.read())
342 def test_get_locations_root(self):
343 self.assertEqual({0: "/"}, self.ra.get_locations("", 0, [0]))
345 def test_check_path(self):
346 cb = self.commit_editor()
350 self.assertEqual(NODE_DIR, self.ra.check_path("bar", 1))
351 self.assertEqual(NODE_DIR, self.ra.check_path("bar/", 1))
352 self.assertEqual(NODE_NONE, self.ra.check_path("blaaaa", 1))
355 cb = self.commit_editor()
359 ret = self.ra.stat("bar", 1)
360 self.assertEqual(set(['last_author', 'kind', 'created_rev', 'has_props', 'time', 'size']), set(ret.keys()))
362 def test_get_locations_dir(self):
363 cb = self.commit_editor()
367 cb = self.commit_editor()
368 cb.add_dir("bla", "bar", 1)
371 cb = self.commit_editor()
375 self.assertEqual({1: "/bar", 2: "/bla"},
376 self.ra.get_locations("bla", 2, [1,2]))
378 self.assertEqual({1: "/bar", 2: "/bar"},
379 self.ra.get_locations("bar", 1, [1,2]))
381 self.assertEqual({1: "/bar", 2: "/bar"},
382 self.ra.get_locations("bar", 2, [1,2]))
384 self.assertEqual({1: "/bar", 2: "/bla", 3: "/bla"},
385 self.ra.get_locations("bla", 3, [1,2,3]))
388 class AuthTests(TestCase):
390 def test_not_list(self):
391 self.assertRaises(TypeError, ra.Auth, ra.get_simple_provider())
393 def test_not_registered(self):
395 self.assertRaises(SubversionException, auth.credentials, "svn.simple", "MyRealm")
397 def test_simple(self):
398 auth = ra.Auth([ra.get_simple_prompt_provider(lambda realm, uname, may_save: ("foo", "geheim", False), 0)])
399 creds = auth.credentials("svn.simple", "MyRealm")
400 self.assertEqual(("foo", "geheim", 0), next(creds))
401 self.assertRaises(StopIteration, next, creds)
403 def test_username(self):
404 auth = ra.Auth([ra.get_username_prompt_provider(lambda realm, may_save: ("somebody", False), 0)])
405 creds = auth.credentials("svn.username", "MyRealm")
406 self.assertEqual(("somebody", 0), next(creds))
407 self.assertRaises(StopIteration, next, creds)
409 def test_client_cert(self):
410 auth = ra.Auth([ra.get_ssl_client_cert_prompt_provider(lambda realm, may_save: ("filename", False), 0)])
411 creds = auth.credentials("svn.ssl.client-cert", "MyRealm")
412 self.assertEqual(("filename", False), next(creds))
413 self.assertRaises(StopIteration, next, creds)
415 def test_client_cert_pw(self):
416 auth = ra.Auth([ra.get_ssl_client_cert_pw_prompt_provider(lambda realm, may_save: ("supergeheim", False), 0)])
417 creds = auth.credentials("svn.ssl.client-passphrase", "MyRealm")
418 self.assertEqual(("supergeheim", False), next(creds))
419 self.assertRaises(StopIteration, next, creds)
421 def test_server_trust(self):
422 auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(lambda realm, failures, certinfo, may_save: (42, False))])
423 auth.set_parameter("svn:auth:ssl:failures", 23)
424 creds = auth.credentials("svn.ssl.server", "MyRealm")
425 self.assertEqual((42, 0), next(creds))
426 self.assertRaises(StopIteration, next, creds)
428 def test_server_untrust(self):
429 auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(lambda realm, failures, certinfo, may_save: None)])
430 auth.set_parameter("svn:auth:ssl:failures", 23)
431 creds = auth.credentials("svn.ssl.server", "MyRealm")
432 self.assertRaises(StopIteration, next, creds)
434 def test_retry(self):
436 def inc_foo(realm, may_save):
438 return ("somebody%d" % self.i, False)
439 auth = ra.Auth([ra.get_username_prompt_provider(inc_foo, 2)])
440 creds = auth.credentials("svn.username", "MyRealm")
441 self.assertEqual(("somebody1", 0), next(creds))
442 self.assertEqual(("somebody2", 0), next(creds))
443 self.assertEqual(("somebody3", 0), next(creds))
444 self.assertRaises(StopIteration, next, creds)
446 def test_set_default_username(self):
448 a.set_parameter("svn:auth:username", "foo")
449 self.assertEqual("foo", a.get_parameter("svn:auth:username"))
451 def test_set_default_password(self):
453 a.set_parameter("svn:auth:password", "bar")
454 self.assertEqual("bar", a.get_parameter("svn:auth:password"))
456 def test_platform_auth_providers(self):
457 ra.Auth(ra.get_platform_specific_client_providers())