More upgrade improvements.
[jelmer/subvertpy.git] / tests / test_convert.py
1 # Copyright (C) 2006-2007 Jelmer Vernooij <jelmer@samba.org>
2
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
7
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17 """Full repository conversion tests."""
18
19 from bzrlib.branch import Branch
20 from bzrlib.bzrdir import BzrDir, format_registry
21 from bzrlib.errors import NotBranchError, NoSuchFile, IncompatibleRepositories
22 from bzrlib.urlutils import local_path_to_url
23 from bzrlib.repository import Repository
24 from bzrlib.tests import TestCaseInTempDir
25
26 import os, sys
27
28 from bzrlib.plugins.svn import repos
29 from bzrlib.plugins.svn.layout import RootLayout, TrunkLayout
30 from bzrlib.plugins.svn.convert import convert_repository, NotDumpFile, load_dumpfile
31 from bzrlib.plugins.svn.format import get_rich_root_format
32 from bzrlib.plugins.svn.tests import SubversionTestCase
33
34 class TestLoadDumpfile(TestCaseInTempDir):
35     def test_loaddumpfile(self):
36         dumpfile = os.path.join(self.test_dir, "dumpfile")
37         open(dumpfile, 'w').write(
38 """SVN-fs-dump-format-version: 2
39
40 UUID: 6987ef2d-cd6b-461f-9991-6f1abef3bd59
41
42 Revision-number: 0
43 Prop-content-length: 56
44 Content-length: 56
45
46 K 8
47 svn:date
48 V 27
49 2006-07-02T13:14:51.972532Z
50 PROPS-END
51 """)
52         load_dumpfile(dumpfile, "d")
53         fs = repos.Repository("d").fs()
54         self.assertEqual("6987ef2d-cd6b-461f-9991-6f1abef3bd59", 
55                 fs.get_uuid())
56
57     def test_loaddumpfile_invalid(self):
58         dumpfile = os.path.join(self.test_dir, "dumpfile")
59         open(dumpfile, 'w').write("""FooBar\n""")
60         self.assertRaises(NotDumpFile, load_dumpfile, dumpfile, "d")
61
62
63 class TestConversion(SubversionTestCase):
64     def setUp(self):
65         super(TestConversion, self).setUp()
66         self.repos_url = self.make_repository('d')
67
68         dc = self.get_commit_editor()
69         t = dc.add_dir("trunk")
70         t.add_file("trunk/file").modify("data")
71         bs = dc.add_dir("branches")
72         ab = bs.add_dir("branches/abranch")
73         ab.add_file("branches/abranch/anotherfile").modify("data2")
74         dc.close()
75
76         dc = self.get_commit_editor()
77         t = dc.open_dir("trunk")
78         t.open_file("trunk/file").modify("otherdata")
79         dc.close()
80
81     def get_commit_editor(self):
82         return super(TestConversion, self).get_commit_editor(self.repos_url)
83
84     def test_sets_parent_urls(self):
85         convert_repository(Repository.open(self.repos_url), "e", 
86                            TrunkLayout(0), 
87                            all=False, create_shared_repo=True)
88         self.assertEquals(self.repos_url+"/trunk", 
89                 Branch.open("e/trunk").get_parent())
90         self.assertEquals(self.repos_url+"/branches/abranch", 
91                 Branch.open("e/branches/abranch").get_parent())
92
93     def test_fetch_alive(self):
94         dc = self.get_commit_editor()
95         bs = dc.open_dir("branches")
96         sb = bs.add_dir("branches/somebranch")
97         sb.add_file("branches/somebranch/somefile").modify('data')
98         dc.close()
99
100         dc = self.get_commit_editor()
101         bs = dc.open_dir("branches")
102         bs.delete("branches/somebranch")
103         dc.close()
104
105         oldrepos = Repository.open(self.repos_url)
106         convert_repository(oldrepos, "e", 
107                            TrunkLayout(0), 
108                            all=False, create_shared_repo=True)
109         newrepos = Repository.open("e")
110         oldrepos.set_layout(TrunkLayout(0))
111         self.assertFalse(newrepos.has_revision(oldrepos.generate_revision_id(2, "branches/somebranch", oldrepos.get_mapping())))
112
113     def test_fetch_filebranch(self):
114         dc = self.get_commit_editor()
115         bs = dc.open_dir("branches")
116         bs.add_file("branches/somebranch").modify('data')
117         dc.close()
118
119         oldrepos = Repository.open(self.repos_url)
120         convert_repository(oldrepos, "e", TrunkLayout(0))
121         newrepos = Repository.open("e")
122         oldrepos.set_layout(TrunkLayout(0))
123         self.assertFalse(newrepos.has_revision(oldrepos.generate_revision_id(2, "branches/somebranch", oldrepos.get_mapping())))
124
125     def test_fetch_dead(self):
126         dc = self.get_commit_editor()
127         bs = dc.open_dir("branches")
128         sb = bs.add_dir("branches/somebranch")
129         sb.add_file("branches/somebranch/somefile").modify('data')
130         dc.close()
131
132         dc = self.get_commit_editor()
133         bs = dc.open_dir("branches")
134         bs.delete("branches/somebranch")
135         dc.close()
136
137         oldrepos = Repository.open(self.repos_url)
138         convert_repository(oldrepos, "e", TrunkLayout(0), 
139                            all=True, create_shared_repo=True)
140         newrepos = Repository.open("e")
141         self.assertTrue(newrepos.has_revision(
142             oldrepos.generate_revision_id(3, "branches/somebranch", oldrepos.get_mapping())))
143
144     def test_fetch_filter(self):
145         dc = self.get_commit_editor()
146         branches = dc.open_dir("branches")
147         dc.add_dir("branches/somebranch")
148         dc.add_file("branches/somebranch/somefile").modify('data')
149         dc.close()
150
151         dc = self.get_commit_editor()
152         branches = dc.open_dir("branches")
153         ab = branches.add_dir("branches/anotherbranch")
154         ab.add_file("branches/anotherbranch/somefile").modify('data')
155         dc.close()
156
157         oldrepos = Repository.open(self.repos_url)
158         convert_repository(oldrepos, "e", TrunkLayout(0), 
159             create_shared_repo=True,
160             filter_branch=lambda branch: branch.get_branch_path().endswith("somebranch"))
161         newrepos = Repository.open("e")
162         self.assertTrue(os.path.exists("e/branches/somebranch"))
163         self.assertFalse(os.path.exists("e/branches/anotherbranch"))
164
165     def test_shared_import_continue(self):
166         dir = BzrDir.create("e", format=get_rich_root_format())
167         dir.create_repository(shared=True)
168
169         convert_repository(Repository.open(self.repos_url), "e", 
170                 TrunkLayout(0), create_shared_repo=True)
171
172         self.assertTrue(Repository.open("e").is_shared())
173
174     def test_shared_import_continue_remove(self):
175         convert_repository(Repository.open(self.repos_url), "e", 
176                 TrunkLayout(0), create_shared_repo=True)
177
178         dc = self.get_commit_editor()
179         dc.delete("trunk")
180         dc.close()
181
182         dc = self.get_commit_editor()
183         trunk = dc.add_dir("trunk")
184         trunk.add_file("trunk/file").modify()
185         dc.close()
186
187         convert_repository(Repository.open(self.repos_url), "e", 
188                            TrunkLayout(0), create_shared_repo=True)
189
190     def test_shared_import_remove_nokeep(self):
191         convert_repository(Repository.open(self.repos_url), "e", 
192                 TrunkLayout(0), create_shared_repo=True)
193
194         dc = self.get_commit_editor()
195         dc.delete("trunk")
196         dc.close()
197
198         self.assertTrue(os.path.exists("e/trunk"))
199
200         convert_repository(Repository.open(self.repos_url), "e", 
201                            TrunkLayout(0), create_shared_repo=True)
202
203         self.assertFalse(os.path.exists("e/trunk"))
204
205     def test_shared_import_continue_with_wt(self):
206         convert_repository(Repository.open(self.repos_url), "e", 
207                 TrunkLayout(0), working_trees=True)
208         convert_repository(Repository.open(self.repos_url), "e", 
209                 TrunkLayout(0), working_trees=True)
210
211     def test_shared_import_rootlayout_empty(self):
212         dir = BzrDir.create("e", format=get_rich_root_format())
213         dir.create_repository(shared=True)
214
215         convert_repository(Repository.open(self.repos_url), "e", 
216                 RootLayout(), create_shared_repo=True)
217
218     def test_shared_import_with_wt(self):
219         dir = BzrDir.create("e", format=get_rich_root_format())
220         dir.create_repository(shared=True)
221
222         convert_repository(Repository.open(self.repos_url), "e", 
223                 TrunkLayout(0), create_shared_repo=True, 
224                 working_trees=True)
225
226         self.assertTrue(os.path.isfile(os.path.join(
227                         self.test_dir, "e", "trunk", "file")))
228
229     def test_shared_import_without_wt(self):
230         dir = BzrDir.create("e", format=get_rich_root_format())
231         dir.create_repository(shared=True)
232
233         convert_repository(Repository.open(self.repos_url), "e", 
234                 TrunkLayout(0), create_shared_repo=True, 
235                 working_trees=False)
236
237         self.assertFalse(os.path.isfile(os.path.join(
238                         self.test_dir, "e", "trunk", "file")))
239
240     def test_shared_import_old_repos_fails(self):
241         dir = BzrDir.create("e", format=format_registry.make_bzrdir('knit'))
242         dir.create_repository(shared=True)
243
244         self.assertRaises(IncompatibleRepositories, 
245             lambda: convert_repository(Repository.open(self.repos_url), "e", 
246                 TrunkLayout(0), create_shared_repo=True, 
247                 working_trees=False))
248
249     def test_shared_import_continue_branch(self):
250         oldrepos = Repository.open(self.repos_url)
251         convert_repository(oldrepos, "e", 
252                 TrunkLayout(0), create_shared_repo=True)
253
254         mapping = oldrepos.get_mapping()
255
256         dc = self.get_commit_editor()
257         trunk = dc.open_dir("trunk")
258         trunk.open_file("trunk/file").modify()
259         dc.close()
260
261         self.assertEqual(
262                 Repository.open(self.repos_url).generate_revision_id(2, "trunk", mapping), 
263                 Branch.open("e/trunk").last_revision())
264
265         convert_repository(Repository.open(self.repos_url), "e", 
266                 TrunkLayout(0), create_shared_repo=True)
267
268         self.assertEqual(Repository.open(self.repos_url).generate_revision_id(3, "trunk", mapping), 
269                         Branch.open("e/trunk").last_revision())
270
271  
272     def test_shared_import(self):
273         convert_repository(Repository.open(self.repos_url), "e", 
274                 TrunkLayout(0), create_shared_repo=True)
275
276         self.assertTrue(Repository.open("e").is_shared())
277     
278     def test_simple(self):
279         convert_repository(Repository.open(self.repos_url), os.path.join(self.test_dir, "e"), TrunkLayout(0))
280         self.assertTrue(os.path.isdir(os.path.join(self.test_dir, "e", "trunk")))
281         self.assertTrue(os.path.isdir(os.path.join(self.test_dir, "e", "branches", "abranch")))
282
283     def test_convert_to_nonexistant(self):
284         self.assertRaises(NoSuchFile, convert_repository, Repository.open(self.repos_url), os.path.join(self.test_dir, "e", "foo", "bar"), TrunkLayout(0))
285
286     def test_notshared_import(self):
287         convert_repository(Repository.open(self.repos_url), "e", 
288                            TrunkLayout(0), create_shared_repo=False)
289
290         self.assertRaises(NotBranchError, Repository.open, "e")
291
292 class TestConversionFromDumpfile(SubversionTestCase):
293     def test_dumpfile_open_empty(self):
294         dumpfile = os.path.join(self.test_dir, "dumpfile")
295         open(dumpfile, 'w').write(
296 """SVN-fs-dump-format-version: 2
297
298 UUID: 6987ef2d-cd6b-461f-9991-6f1abef3bd59
299
300 Revision-number: 0
301 Prop-content-length: 56
302 Content-length: 56
303
304 K 8
305 svn:date
306 V 27
307 2006-07-02T13:14:51.972532Z
308 PROPS-END
309 """)
310         branch_path = os.path.join(self.test_dir, "f")
311         repos = self.load_dumpfile(dumpfile, 'g')
312         convert_repository(repos, branch_path, RootLayout())
313         branch = Repository.open(branch_path)
314         mapping = branch.repository.get_mapping()
315         self.assertEqual([mapping.revision_id_foreign_to_bzr(("6987ef2d-cd6b-461f-9991-6f1abef3bd59", 0, ""))], branch.all_revision_ids())
316         Branch.open(branch_path)
317
318     def load_dumpfile(self, dumpfile, target_path):
319         load_dumpfile(dumpfile, target_path)
320         return Repository.open(target_path)
321
322     def test_dumpfile_open_empty_trunk(self):
323         dumpfile = os.path.join(self.test_dir, "dumpfile")
324         open(dumpfile, 'w').write(
325 """SVN-fs-dump-format-version: 2
326
327 UUID: 6987ef2d-cd6b-461f-9991-6f1abef3bd59
328
329 Revision-number: 0
330 Prop-content-length: 56
331 Content-length: 56
332
333 K 8
334 svn:date
335 V 27
336 2006-07-02T13:14:51.972532Z
337 PROPS-END
338 """)
339         branch_path = os.path.join(self.test_dir, "f")
340         repos = self.load_dumpfile(dumpfile, 'g')
341         convert_repository(repos, branch_path, TrunkLayout(0))
342         repository = Repository.open(branch_path)
343         self.assertEqual([], repository.all_revision_ids())
344         self.assertRaises(NotBranchError, Branch.open, branch_path)
345
346     def test_open_internal(self):
347         filename = os.path.join(self.test_dir, "dumpfile")
348         open(filename, 'w').write(
349 """SVN-fs-dump-format-version: 2
350
351 UUID: 6987ef2d-cd6b-461f-9991-6f1abef3bd59
352
353 Revision-number: 0
354 Prop-content-length: 56
355 Content-length: 56
356
357 K 8
358 svn:date
359 V 27
360 2006-07-02T13:14:51.972532Z
361 PROPS-END
362
363 Revision-number: 1
364 Prop-content-length: 109
365 Content-length: 109
366
367 K 7
368 svn:log
369 V 9
370 Add trunk
371 K 10
372 svn:author
373 V 6
374 jelmer
375 K 8
376 svn:date
377 V 27
378 2006-07-02T13:58:02.528258Z
379 PROPS-END
380
381 Node-path: trunk
382 Node-kind: dir
383 Node-action: add
384 Prop-content-length: 10
385 Content-length: 10
386
387 PROPS-END
388
389
390 Node-path: trunk/bla
391 Node-kind: file
392 Node-action: add
393 Prop-content-length: 10
394 Text-content-length: 5
395 Text-content-md5: 6137cde4893c59f76f005a8123d8e8e6
396 Content-length: 15
397
398 PROPS-END
399 data
400
401
402 """)
403         repos = self.load_dumpfile(filename, 'g')
404         convert_repository(repos, os.path.join(self.test_dir, "e"), 
405                            TrunkLayout(0))
406         mapping = repos.get_mapping()
407         abspath = self.test_dir
408         if sys.platform == 'win32':
409             abspath = '/' + abspath
410         branch = Branch.open(os.path.join(self.test_dir, "e", "trunk"))
411         self.assertEqual(local_path_to_url(os.path.join(self.test_dir, "e", "trunk")), branch.base.rstrip("/"))
412         self.assertEqual(mapping.revision_id_foreign_to_bzr(("6987ef2d-cd6b-461f-9991-6f1abef3bd59", 1, 'trunk')), branch.last_revision())
413