Add (mostly) Python3 support.
[jelmer/subvertpy.git] / subvertpy / tests / test_ra.py
1 # Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@jelmer.uk>
2  
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published by
5 # the Free Software Foundation; either version 2.1 of the License, or
6 # (at your option) any later version.
7
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Lesser General Public License for more details.
12
13 # You should have received a copy of the GNU Lesser General Public License
14 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
16 """Subversion ra library tests."""
17
18 from io import BytesIO
19
20 from subvertpy import (
21     NODE_DIR, NODE_NONE, NODE_UNKNOWN,
22     SubversionException,
23     ra,
24     )
25 from subvertpy.tests import (
26     SubversionTestCase,
27     TestCase,
28     )
29
30 class VersionTest(TestCase):
31
32     def test_version_length(self):
33         self.assertEqual(4, len(ra.version()))
34
35     def test_api_version_length(self):
36         self.assertEqual(4, len(ra.api_version()))
37
38     def test_api_version_later_same(self):
39         self.assertTrue(ra.api_version() <= ra.version())
40
41
42 class TestRemoteAccessUnknown(TestCase):
43
44     def test_unknown_url(self):
45         self.assertRaises(SubversionException, ra.RemoteAccess, "bla://")
46
47
48 class TestRemoteAccess(SubversionTestCase):
49
50     def setUp(self):
51         super(TestRemoteAccess, self).setUp()
52         self.repos_url = self.make_repository("d")
53         self.ra = ra.RemoteAccess(self.repos_url,
54                 auth=ra.Auth([ra.get_username_provider()]))
55
56     def tearDown(self):
57         del self.ra
58         super(TestRemoteAccess, self).tearDown()
59
60     def commit_editor(self):
61         return self.get_commit_editor(self.repos_url)
62
63     def do_commit(self):
64         dc = self.get_commit_editor(self.repos_url)
65         dc.add_dir("foo")
66         dc.close()
67
68     def test_repr(self):
69         self.assertEqual("RemoteAccess(\"%s\")" % self.repos_url,
70                           repr(self.ra))
71
72     def test_latest_revnum(self):
73         self.assertEqual(0, self.ra.get_latest_revnum())
74
75     def test_latest_revnum_one(self):
76         self.do_commit()
77         self.assertEqual(1, self.ra.get_latest_revnum())
78
79     def test_get_uuid(self):
80         self.assertEqual(36, len(self.ra.get_uuid()))
81
82     def test_get_repos_root(self):
83         self.assertEqual(self.repos_url, self.ra.get_repos_root())
84
85     def test_get_url(self):
86         if ra.api_version() < (1, 5):
87             self.assertRaises(NotImplementedError, self.ra.get_url)
88         else:
89             self.assertEqual(self.repos_url, self.ra.get_url())
90
91     def test_reparent(self):
92         self.ra.reparent(self.repos_url)
93
94     def test_has_capability(self):
95         if ra.api_version() < (1, 5):
96             self.assertRaises(NotImplementedError, self.ra.has_capability, "FOO")
97         else:
98             self.assertRaises(SubversionException, self.ra.has_capability, "FOO")
99
100     def test_get_dir(self):
101         ret = self.ra.get_dir("", 0)
102         self.assertIsInstance(ret, tuple)
103
104     def test_get_dir_leading_slash(self):
105         ret = self.ra.get_dir("/", 0)
106         self.assertIsInstance(ret, tuple)
107
108     def test_get_dir_kind(self):
109         self.do_commit()
110         (dirents, fetch_rev, props) = self.ra.get_dir("/", 1, fields=ra.DIRENT_KIND)
111         self.assertIsInstance(props, dict)
112         self.assertEqual(1, fetch_rev)
113         self.assertEqual(NODE_DIR, dirents["foo"]["kind"])
114
115     def test_change_rev_prop(self):
116         self.do_commit()
117         self.ra.change_rev_prop(1, "foo", "bar")
118
119     def test_rev_proplist(self):
120         self.assertIsInstance(self.ra.rev_proplist(0), dict)
121
122     def test_do_diff(self):
123         self.do_commit()
124
125         class MyFileEditor:
126             def change_prop(self, name, val): pass 
127             def close(self, checksum=None): pass
128
129         class MyDirEditor:
130             def change_prop(self, name, val): pass 
131             def add_directory(self, *args): return MyDirEditor()
132             def add_file(self, *args): return MyFileEditor()
133             def close(self): pass
134
135         class MyEditor:
136             def set_target_revision(self, rev): pass 
137             def open_root(self, base_rev):
138                 return MyDirEditor()
139             def close(self): pass
140         reporter = self.ra.do_diff(1, "", self.ra.get_repos_root(), MyEditor())
141         reporter.set_path("", 0, True)
142         reporter.finish()
143         self.assertRaises(RuntimeError, reporter.finish)
144         self.assertRaises(RuntimeError, reporter.set_path, "", 0, True)
145
146     def test_iter_log_invalid(self):
147         self.assertRaises(SubversionException, list, self.ra.iter_log(
148                 ["idontexist"], 0, 0, revprops=["svn:date", "svn:author", "svn:log"]))
149         self.assertRaises(SubversionException, list, self.ra.iter_log(
150                 [""], 0, 1000, revprops=["svn:date", "svn:author", "svn:log"]))
151
152     def test_iter_log(self):
153         def check_results(returned):
154             self.assertEqual(2, len(returned))
155             self.assertTrue(len(returned[0]) in (3,4))
156             if len(returned[0]) == 3:
157                 (paths, revnum, props) = returned[0]
158             else:
159                 (paths, revnum, props, has_children) = returned[0]
160             self.assertEqual(None, paths)
161             self.assertEqual(revnum, 0)
162             self.assertEqual(["svn:date"], list(props.keys()))
163             if len(returned[1]) == 3:
164                 (paths, revnum, props) = returned[1]
165             else:
166                 (paths, revnum, props, has_children) = returned[1]
167             if ra.api_version() < (1, 6):
168                 self.assertEqual({'/foo': ('A', None, -1, NODE_UNKNOWN)}, paths)
169             else:
170                 self.assertEqual({'/foo': ('A', None, -1, NODE_DIR)}, paths)
171             self.assertEqual(revnum, 1)
172             self.assertEqual(set(["svn:date", "svn:author", "svn:log"]), 
173                               set(props.keys()))
174         returned = list(self.ra.iter_log([""], 0, 0,
175             revprops=["svn:date", "svn:author", "svn:log"]))
176         self.assertEqual(1, len(returned))
177         self.do_commit()
178         returned = list(self.ra.iter_log(None, 0, 1, discover_changed_paths=True, 
179             strict_node_history=False, revprops=["svn:date", "svn:author", "svn:log"]))
180         check_results(returned)
181
182     def test_get_log(self):
183         returned = []
184         def cb(*args):
185             returned.append(args)
186         def check_results(returned):
187             self.assertEqual(2, len(returned))
188             self.assertTrue(len(returned[0]) in (3,4))
189             if len(returned[0]) == 3:
190                 (paths, revnum, props) = returned[0]
191             else:
192                 (paths, revnum, props, has_children) = returned[0]
193             self.assertEqual(None, paths)
194             self.assertEqual(revnum, 0)
195             self.assertEqual(["svn:date"], list(props.keys()))
196             if len(returned[1]) == 3:
197                 (paths, revnum, props) = returned[1]
198             else:
199                 (paths, revnum, props, has_children) = returned[1]
200             self.assertEqual({'/foo': ('A', None, -1)}, paths)
201             self.assertEqual(revnum, 1)
202             self.assertEqual(set(["svn:date", "svn:author", "svn:log"]), 
203                              set(props.keys()))
204         self.ra.get_log(cb, [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"])
205         self.assertEqual(1, len(returned))
206         self.do_commit()
207         returned = []
208         self.ra.get_log(cb, None, 0, 1, discover_changed_paths=True, 
209                         strict_node_history=False, revprops=["svn:date", "svn:author", "svn:log"])
210         check_results(returned)
211
212     def test_get_log_cancel(self):
213         def cb(*args):
214             raise KeyError
215         self.do_commit()
216         self.assertRaises(KeyError,
217             self.ra.get_log, cb, [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"])
218
219     def test_get_commit_editor_double_close(self):
220         def mycb(*args):
221             pass
222         editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
223         dir = editor.open_root()
224         dir.close()
225         self.assertRaises(RuntimeError, dir.close)
226         editor.close()
227         self.assertRaises(RuntimeError, editor.close)
228         self.assertRaises(RuntimeError, editor.abort)
229
230     def test_get_commit_editor_busy(self):
231         def mycb(rev):
232             pass
233         editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
234         self.assertRaises(ra.BusyException, self.ra.get_commit_editor,
235             {"svn:log": "foo"}, mycb)
236         editor.abort()
237
238     def test_get_commit_editor_double_open(self):
239         def mycb(rev):
240             pass
241         editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
242         root = editor.open_root()
243         root.add_directory("somedir")
244         self.assertRaises(RuntimeError, root.add_directory, "foo")
245
246     def test_get_commit_editor_custom_revprops(self):
247         if ra.version()[:2] < (1,5):
248             return
249         def mycb(paths, rev, revprops):
250             pass
251         editor = self.ra.get_commit_editor({"svn:log": "foo", "bar:foo": "bla", "svn:custom:blie": "bloe"}, mycb)
252         root = editor.open_root()
253         root.add_directory("somedir").close()
254         root.close()
255         editor.close()
256
257         revprops = self.ra.rev_proplist(1)
258         self.assertEqual(
259             set(['bar:foo', 'svn:author', 'svn:custom:blie', 'svn:date', 'svn:log']),
260             set(revprops.keys()), "result: %r" % revprops)
261
262     def test_get_commit_editor_context_manager(self):
263         def mycb(paths, rev, revprops):
264             pass
265         editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
266         self.assertIs(editor, editor.__enter__())
267         dir = editor.open_root(0)
268         subdir = dir.add_directory("foo")
269         self.assertIs(subdir, subdir.__enter__())
270         subdir.__exit__(None, None, None)
271         dir.__exit__(None, None, None)
272         editor.__exit__(None, None, None)
273
274     def test_get_commit_editor(self):
275         def mycb(paths, rev, revprops):
276             pass
277         editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
278         dir = editor.open_root(0)
279         subdir = dir.add_directory("foo")
280         subdir.close()
281         dir.close()
282         editor.close()
283
284     def test_commit_file_props(self):
285         cb = self.commit_editor()
286         f = cb.add_file("bar")
287         f.modify(b"a")
288         f.change_prop("bla:bar", "blie")
289         cb.close()
290
291         cb = self.commit_editor()
292         f = cb.open_file("bar")
293         f.change_prop("bla:bar", None)
294         cb.close()
295
296         stream = BytesIO()
297         props = self.ra.get_file("bar", stream, 1)[1]
298         self.assertEqual(b"blie", props.get("bla:bar"))
299         stream = BytesIO()
300         props = self.ra.get_file("bar", stream, 2)[1]
301         self.assertIs(None, props.get("bla:bar"))
302
303     def test_get_file_revs(self):
304         cb = self.commit_editor()
305         cb.add_file("bar").modify(b"a")
306         cb.close()
307
308         cb = self.commit_editor()
309         f = cb.open_file("bar")
310         f.modify(b"b")
311         f.change_prop("bla", "bloe")
312         cb.close()
313
314         rets = []
315
316         def handle(path, rev, props, from_merge=None):
317             rets.append((path, rev, props))
318
319         self.ra.get_file_revs("bar", 1, 2, handle)
320
321         self.assertEqual(2, len(rets))
322         self.assertEqual(1, rets[0][1])
323         self.assertEqual(2, rets[1][1])
324         self.assertEqual("/bar", rets[0][0])
325         self.assertEqual("/bar", rets[1][0])
326
327     def test_get_file(self):
328         cb = self.commit_editor()
329         cb.add_file("bar").modify(b"a")
330         cb.close()
331
332         stream = BytesIO()
333         self.ra.get_file("bar", stream, 1)
334         stream.seek(0)
335         self.assertEqual(b"a", stream.read())
336
337         stream = BytesIO()
338         self.ra.get_file("/bar", stream, 1)
339         stream.seek(0)
340         self.assertEqual(b"a", stream.read())
341
342     def test_get_locations_root(self):
343         self.assertEqual({0: "/"}, self.ra.get_locations("", 0, [0]))
344
345     def test_check_path(self):
346         cb = self.commit_editor()
347         cb.add_dir("bar")
348         cb.close()
349
350         self.assertEqual(NODE_DIR, self.ra.check_path("bar", 1))
351         self.assertEqual(NODE_DIR, self.ra.check_path("bar/", 1))
352         self.assertEqual(NODE_NONE, self.ra.check_path("blaaaa", 1))
353
354     def test_stat(self):
355         cb = self.commit_editor()
356         cb.add_dir("bar")
357         cb.close()
358
359         ret = self.ra.stat("bar", 1)
360         self.assertEqual(set(['last_author', 'kind', 'created_rev', 'has_props', 'time', 'size']), set(ret.keys()))
361
362     def test_get_locations_dir(self):
363         cb = self.commit_editor()
364         cb.add_dir("bar")
365         cb.close()
366
367         cb = self.commit_editor()
368         cb.add_dir("bla", "bar", 1)
369         cb.close()
370
371         cb = self.commit_editor()
372         cb.delete("bar")
373         cb.close()
374
375         self.assertEqual({1: "/bar", 2: "/bla"}, 
376                           self.ra.get_locations("bla", 2, [1,2]))
377
378         self.assertEqual({1: "/bar", 2: "/bar"}, 
379                           self.ra.get_locations("bar", 1, [1,2]))
380
381         self.assertEqual({1: "/bar", 2: "/bar"}, 
382                           self.ra.get_locations("bar", 2, [1,2]))
383
384         self.assertEqual({1: "/bar", 2: "/bla", 3: "/bla"}, 
385                           self.ra.get_locations("bla", 3, [1,2,3]))
386
387
388 class AuthTests(TestCase):
389
390     def test_not_list(self):
391         self.assertRaises(TypeError, ra.Auth, ra.get_simple_provider())
392
393     def test_not_registered(self):
394         auth = ra.Auth([])
395         self.assertRaises(SubversionException, auth.credentials, "svn.simple", "MyRealm")
396
397     def test_simple(self):
398         auth = ra.Auth([ra.get_simple_prompt_provider(lambda realm, uname, may_save: ("foo", "geheim", False), 0)])
399         creds = auth.credentials("svn.simple", "MyRealm")
400         self.assertEqual(("foo", "geheim", 0), next(creds))
401         self.assertRaises(StopIteration, next, creds)
402
403     def test_username(self):
404         auth = ra.Auth([ra.get_username_prompt_provider(lambda realm, may_save: ("somebody", False), 0)])
405         creds = auth.credentials("svn.username", "MyRealm")
406         self.assertEqual(("somebody", 0), next(creds))
407         self.assertRaises(StopIteration, next, creds)
408
409     def test_client_cert(self):
410         auth = ra.Auth([ra.get_ssl_client_cert_prompt_provider(lambda realm, may_save: ("filename", False), 0)])
411         creds = auth.credentials("svn.ssl.client-cert", "MyRealm")
412         self.assertEqual(("filename", False), next(creds))
413         self.assertRaises(StopIteration, next, creds)
414
415     def test_client_cert_pw(self):
416         auth = ra.Auth([ra.get_ssl_client_cert_pw_prompt_provider(lambda realm, may_save: ("supergeheim", False), 0)])
417         creds = auth.credentials("svn.ssl.client-passphrase", "MyRealm")
418         self.assertEqual(("supergeheim", False), next(creds))
419         self.assertRaises(StopIteration, next, creds)
420
421     def test_server_trust(self):
422         auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(lambda realm, failures, certinfo, may_save: (42, False))])
423         auth.set_parameter("svn:auth:ssl:failures", 23)
424         creds = auth.credentials("svn.ssl.server", "MyRealm")
425         self.assertEqual((42, 0), next(creds))
426         self.assertRaises(StopIteration, next, creds)
427
428     def test_server_untrust(self):
429         auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(lambda realm, failures, certinfo, may_save: None)])
430         auth.set_parameter("svn:auth:ssl:failures", 23)
431         creds = auth.credentials("svn.ssl.server", "MyRealm")
432         self.assertRaises(StopIteration, next, creds)
433
434     def test_retry(self):
435         self.i = 0
436         def inc_foo(realm, may_save):
437             self.i += 1
438             return ("somebody%d" % self.i, False)
439         auth = ra.Auth([ra.get_username_prompt_provider(inc_foo, 2)])
440         creds = auth.credentials("svn.username", "MyRealm")
441         self.assertEqual(("somebody1", 0), next(creds))
442         self.assertEqual(("somebody2", 0), next(creds))
443         self.assertEqual(("somebody3", 0), next(creds))
444         self.assertRaises(StopIteration, next, creds)
445
446     def test_set_default_username(self):
447         a = ra.Auth([])
448         a.set_parameter("svn:auth:username", "foo")
449         self.assertEqual("foo", a.get_parameter("svn:auth:username"))
450
451     def test_set_default_password(self):
452         a = ra.Auth([])
453         a.set_parameter("svn:auth:password", "bar")
454         self.assertEqual("bar", a.get_parameter("svn:auth:password"))
455
456     def test_platform_auth_providers(self):
457         ra.Auth(ra.get_platform_specific_client_providers())