Fix some bugs in apr pool handling.
[jelmer/subvertpy.git] / transport.py
1 # Copyright (C) 2006 Jelmer Vernooij <jelmer@samba.org>
2
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
7
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16 """Simple transport for accessing Subversion smart servers."""
17
18 from bzrlib import debug, urlutils
19 from bzrlib.errors import (NoSuchFile, NotBranchError, TransportNotPossible, 
20                            FileExists, NotLocalUrl, InvalidURL)
21 from bzrlib.trace import mutter
22 from bzrlib.transport import Transport
23
24 from core import SubversionException
25 from auth import create_auth_baton
26 import ra
27 import core
28 import constants
29
30 from errors import convert_svn_error, NoSvnRepositoryPresent
31 import urlparse
32 import urllib
33
34 svn_config = core.get_config()
35
36 def get_client_string():
37     """Return a string that can be send as part of the User Agent string."""
38     return "bzr%s+bzr-svn%s" % (bzrlib.__version__, bzrlib.plugins.svn.__version__)
39
40 # Don't run any tests on SvnTransport as it is not intended to be 
41 # a full implementation of Transport
42 def get_test_permutations():
43     return []
44
45
46 def get_svn_ra_transport(bzr_transport):
47     """Obtain corresponding SvnRaTransport for a stock Bazaar transport."""
48     if isinstance(bzr_transport, SvnRaTransport):
49         return bzr_transport
50
51     return SvnRaTransport(bzr_transport.base)
52
53
54 def _url_unescape_uri(url):
55     (scheme, netloc, path, query, fragment) = urlparse.urlsplit(url)
56     path = urllib.unquote(path)
57     return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
58
59
60 def bzr_to_svn_url(url):
61     """Convert a Bazaar URL to a URL understood by Subversion.
62
63     This will possibly remove the svn+ prefix.
64     """
65     if (url.startswith("svn+http://") or 
66         url.startswith("svn+file://") or
67         url.startswith("svn+https://")):
68         url = url[len("svn+"):] # Skip svn+
69
70     if url.startswith("http"):
71         # Without this, URLs with + in them break
72         url = _url_unescape_uri(url)
73
74     # The SVN libraries don't like trailing slashes...
75     url = url.rstrip('/')
76
77     return url
78
79
80 def needs_busy(unbound):
81     """Decorator that marks a connection as busy before running a methd on it.
82     """
83     def convert(self, *args, **kwargs):
84         self._mark_busy()
85         try:
86             return unbound(self, *args, **kwargs)
87         finally:
88             self._unmark_busy()
89
90     convert.__doc__ = unbound.__doc__
91     convert.__name__ = unbound.__name__
92     return convert
93
94
95 class Connection(object):
96     """An single connection to a Subversion repository. This usually can 
97     only do one operation at a time."""
98     def __init__(self, url):
99         self._busy = False
100         self._root = None
101         self._unbusy_handler = None
102         self.url = url
103         try:
104             self.mutter('opening SVN RA connection to %r' % url)
105             self._ra = ra.RemoteAccess(url.encode('utf8'), 
106                     auth=create_auth_baton(self.url))
107             # FIXME: Callbacks
108         except SubversionException, (_, num):
109             if num in (constants.ERR_RA_SVN_REPOS_NOT_FOUND,):
110                 raise NoSvnRepositoryPresent(url=url)
111             if num == constants.ERR_BAD_URL:
112                 raise InvalidURL(url)
113             raise
114
115         from bzrlib.plugins.svn import lazy_check_versions
116         lazy_check_versions()
117
118     def is_busy(self):
119         return self._busy
120
121     def _mark_busy(self):
122         assert not self._busy, "already busy"
123         self._busy = True
124
125     def set_unbusy_handler(self, handler):
126         self._unbusy_handler = handler
127
128     def _unmark_busy(self):
129         assert self._busy, "not busy"
130         self._busy = False
131         if self._unbusy_handler is not None:
132             self._unbusy_handler()
133             self._unbusy_handler = None
134
135     def mutter(self, text):
136         if 'transport' in debug.debug_flags:
137                 mutter(text)
138
139     @convert_svn_error
140     def get_uuid(self):
141         self.mutter('svn get-uuid')
142         return self._ra.get_uuid()
143
144     @convert_svn_error
145     @needs_busy
146     def get_repos_root(self):
147         if self._root is None:
148             self.mutter("svn get-repos-root")
149             self._root = self._ra.get_repos_root()
150         return self._root
151
152     @convert_svn_error
153     def get_latest_revnum(self):
154         self.mutter("svn get-latest-revnum")
155         return self._ra.get_latest_revnum()
156
157     @convert_svn_error
158     def do_switch(self, switch_rev, recurse, switch_url, editor):
159         self.mutter('svn switch -r %d -> %r' % (switch_rev, switch_url))
160         return self._ra.do_switch(switch_rev, "", recurse, switch_url, editor)
161
162     @convert_svn_error
163     def change_rev_prop(self, revnum, name, value):
164         self.mutter('svn revprop -r%d --set %s=%s' % (revnum, name, value))
165         self._ra.change_rev_prop(revnum, name, value)
166  
167     @convert_svn_error
168     @needs_busy
169     def get_dir(self, path, revnum, pool=None, kind=False):
170         self.mutter("svn ls -r %d '%r'" % (revnum, path))
171         assert len(path) == 0 or path[0] != "/"
172         # ra_dav backends fail with strange errors if the path starts with a 
173         # slash while other backends don't.
174         fields = 0
175         if kind:
176             fields += core.SVN_DIRENT_KIND
177         return self._ra.get_dir(path, revnum, fields)
178
179     @convert_svn_error
180     def get_lock(self, path):
181         return self._ra.get_lock(path)
182
183     @convert_svn_error
184     def unlock(self, locks, break_lock=False):
185         def lock_cb(baton, path, do_lock, lock, ra_err):
186             pass
187         return self._ra.unlock(locks, break_lock, lock_cb)
188
189     @convert_svn_error
190     def lock_write(self, path_revs, comment=None, steal_lock=False):
191         return self.PhonyLock() # FIXME
192         tokens = {}
193         def lock_cb(baton, path, do_lock, lock, ra_err):
194             tokens[path] = lock
195         self._ra.lock(path_revs, comment, steal_lock, lock_cb)
196         return SvnLock(self, tokens)
197
198     @convert_svn_error
199     @needs_busy
200     def check_path(self, path, revnum):
201         assert len(path) == 0 or path[0] != "/"
202         self.mutter("svn check_path -r%d %s" % (revnum, path))
203         return self._ra.check_path(path.encode('utf-8'), revnum)
204
205     @convert_svn_error
206     def mkdir(self, relpath, mode=None):
207         assert len(relpath) == 0 or relpath[0] != "/"
208         path = urlutils.join(self.url, relpath)
209         try:
210             self._client.mkdir([path.encode("utf-8")])
211         except SubversionException, (msg, num):
212             if num == constants.ERR_FS_NOT_FOUND:
213                 raise NoSuchFile(path)
214             if num == constants.ERR_FS_ALREADY_EXISTS:
215                 raise FileExists(path)
216             raise
217
218     @convert_svn_error
219     def replay(self, revision, low_water_mark, send_deltas, editor):
220         self.mutter('svn replay -r%r:%r' % (low_water_mark, revision))
221         self._ra.replay(revision, low_water_mark, editor, send_deltas)
222
223     @convert_svn_error
224     def do_update(self, revnum, recurse, editor):
225         self.mutter('svn update -r %r' % revnum)
226         return self._ra.do_update(revnum, "", recurse, editor)
227
228     @convert_svn_error
229     def has_capability(self, cap):
230         return self._ra.has_capability(cap)
231
232     @convert_svn_error
233     def revprop_list(self, revnum):
234         self.mutter('svn revprop-list -r %r' % revnum)
235         return self._ra.rev_proplist(revnum)
236
237     @convert_svn_error
238     def get_commit_editor(self, revprops, done_cb, lock_token, keep_locks):
239         return self._ra.get_commit_editor(revprops, done_cb, lock_token, 
240                                           keep_locks)
241
242     class SvnLock(object):
243         def __init__(self, connection, tokens):
244             self._tokens = tokens
245             self._connection = connection
246
247         def unlock(self):
248             self._connection.unlock(self.locks)
249
250     @convert_svn_error
251     @needs_busy
252     def lock_write(self, path_revs, comment=None, steal_lock=False):
253         tokens = {}
254         def lock_cb(baton, path, do_lock, lock, ra_err, pool):
255             tokens[path] = lock
256         self._ra.lock(path_revs, comment, steal_lock, lock_cb)
257         return SvnLock(self, tokens)
258
259     @convert_svn_error
260     @needs_busy
261     def get_log(self, paths, from_revnum, to_revnum, limit, 
262                 discover_changed_paths, strict_node_history, revprops, rcvr):
263         # No paths starting with slash, please
264         assert paths is None or all([not p.startswith("/") for p in paths])
265         self.mutter('svn log %r:%r %r (limit: %r)' % (from_revnum, to_revnum, paths, limit))
266         return self._ra.get_log(rcvr, paths, 
267                        from_revnum, to_revnum, limit, 
268                        discover_changed_paths, strict_node_history, 
269                        revprops)
270
271     @convert_svn_error
272     @needs_busy
273     def reparent(self, url):
274         if self.url == url:
275             return
276         if hasattr(self._ra, 'reparent'):
277             self.mutter('svn reparent %r' % url)
278             self._ra.reparent(url)
279             self.url = url
280         else:
281             raise NotImplementedError(self.reparent)
282
283
284 class ConnectionPool(object):
285     """Collection of connections to a Subversion repository."""
286     def __init__(self):
287         self.connections = set()
288
289     def get(self, url):
290         # Check if there is an existing connection we can use
291         for c in self.connections:
292             assert not c.is_busy(), "busy connection in pool"
293             if c.url == url:
294                 self.connections.remove(c)
295                 return c
296         # Nothing available? Just pick an existing one and reparent:
297         if len(self.connections) == 0:
298             return Connection(url)
299         c = self.connections.pop()
300         try:
301             c.reparent(url)
302             return c
303         except NotImplementedError:
304             self.connections.add(c)
305             return Connection(url)
306         except:
307             self.connections.add(c)
308             raise
309
310     def add(self, connection):
311         assert not connection.is_busy(), "adding busy connection in pool"
312         self.connections.add(connection)
313     
314
315 class SvnRaTransport(Transport):
316     """Fake transport for Subversion-related namespaces.
317     
318     This implements just as much of Transport as is necessary 
319     to fool Bazaar. """
320     @convert_svn_error
321     def __init__(self, url="", _backing_url=None, pool=None):
322         bzr_url = url
323         self.svn_url = bzr_to_svn_url(url)
324         # _backing_url is an evil hack so the root directory of a repository 
325         # can be accessed on some HTTP repositories. 
326         if _backing_url is None:
327             _backing_url = self.svn_url
328         self._backing_url = _backing_url.rstrip("/")
329         Transport.__init__(self, bzr_url)
330
331         if pool is None:
332             self.connections = ConnectionPool()
333
334             # Make sure that the URL is valid by connecting to it.
335             self.connections.add(self.connections.get(self._backing_url))
336         else:
337             self.connections = pool
338
339         from bzrlib.plugins.svn import lazy_check_versions
340         lazy_check_versions()
341
342     def get_connection(self):
343         return self.connections.get(self._backing_url)
344
345     def add_connection(self, conn):
346         self.connections.add(conn)
347
348     def has(self, relpath):
349         """See Transport.has()."""
350         # TODO: Raise TransportNotPossible here instead and 
351         # catch it in bzrdir.py
352         return False
353
354     def get(self, relpath):
355         """See Transport.get()."""
356         # TODO: Raise TransportNotPossible here instead and 
357         # catch it in bzrdir.py
358         raise NoSuchFile(path=relpath)
359
360     def stat(self, relpath):
361         """See Transport.stat()."""
362         raise TransportNotPossible('stat not supported on Subversion')
363
364     def get_uuid(self):
365         conn = self.get_connection()
366         try:
367             return conn.get_uuid()
368         finally:
369             self.add_connection(conn)
370
371     def get_repos_root(self):
372         root = self.get_svn_repos_root()
373         if (self.base.startswith("svn+http:") or 
374             self.base.startswith("svn+https:")):
375             return "svn+%s" % root
376         return root
377
378     def get_svn_repos_root(self):
379         conn = self.get_connection()
380         try:
381             return conn.get_repos_root()
382         finally:
383             self.add_connection(conn)
384
385     def get_latest_revnum(self):
386         conn = self.get_connection()
387         try:
388             return conn.get_latest_revnum()
389         finally:
390             self.add_connection(conn)
391
392     def do_switch(self, switch_rev, recurse, switch_url, editor, pool=None):
393         conn = self._open_real_transport()
394         conn.set_unbusy_handler(lambda: self.add_connection(conn))
395         return conn.do_switch(switch_rev, recurse, switch_url, editor, pool)
396
397     def iter_log(self, paths, from_revnum, to_revnum, limit, discover_changed_paths, 
398                  strict_node_history, revprops):
399         assert paths is None or isinstance(paths, list)
400         assert paths is None or all([isinstance(x, str) for x in paths])
401         assert isinstance(from_revnum, int) and isinstance(to_revnum, int)
402         assert isinstance(limit, int)
403         from threading import Thread, Semaphore
404
405         class logfetcher(Thread):
406             def __init__(self, transport, **kwargs):
407                 Thread.__init__(self)
408                 self.setDaemon(True)
409                 self.transport = transport
410                 self.kwargs = kwargs
411                 self.pending = []
412                 self.conn = None
413                 self.semaphore = Semaphore(0)
414
415             def next(self):
416                 self.semaphore.acquire()
417                 ret = self.pending.pop(0)
418                 if ret is None:
419                     self.transport.add_connection(self.conn)
420                 elif isinstance(ret, Exception):
421                     self.transport.add_connection(self.conn)
422                     raise ret
423                 return ret
424
425             def run(self):
426                 assert self.conn is None, "already running"
427                 def rcvr(*args):
428                     self.pending.append(args)
429                     self.semaphore.release()
430                 self.conn = self.transport.get_connection()
431                 try:
432                     self.conn.get_log(rcvr=rcvr, **self.kwargs)
433                     self.pending.append(None)
434                 except Exception, e:
435                     self.pending.append(e)
436                 self.semaphore.release()
437
438         if paths is None:
439             newpaths = None
440         else:
441             newpaths = [self._request_path(path) for path in paths]
442         
443         fetcher = logfetcher(self, paths=newpaths, from_revnum=from_revnum, to_revnum=to_revnum, limit=limit, discover_changed_paths=discover_changed_paths, strict_node_history=strict_node_history, revprops=revprops)
444         fetcher.start()
445         return iter(fetcher.next, None)
446
447     def get_log(self, paths, from_revnum, to_revnum, limit, discover_changed_paths, 
448                 strict_node_history, revprops, rcvr, pool=None):
449         assert paths is None or isinstance(paths, list), "Invalid paths"
450         assert paths is None or all([isinstance(x, str) for x in paths])
451
452         if paths is None:
453             newpaths = None
454         else:
455             newpaths = [self._request_path(path) for path in paths]
456
457         conn = self.get_connection()
458         try:
459             return conn.get_log(newpaths, 
460                     from_revnum, to_revnum,
461                     limit, discover_changed_paths, strict_node_history, 
462                     revprops, rcvr)
463         finally:
464             self.add_connection(conn)
465
466     def _open_real_transport(self):
467         if self._backing_url != self.svn_url:
468             return self.connections.get(self.svn_url)
469         return self.get_connection()
470
471     def change_rev_prop(self, revnum, name, value, pool=None):
472         conn = self.get_connection()
473         try:
474             return conn.change_rev_prop(revnum, name, value, pool)
475         finally:
476             self.add_connection(conn)
477
478     def get_dir(self, path, revnum, pool=None, kind=False):
479         path = self._request_path(path)
480         conn = self.get_connection()
481         try:
482             return conn.get_dir(path, revnum, pool, kind)
483         finally:
484             self.add_connection(conn)
485
486     def mutter(self, text):
487         if 'transport' in debug.debug_flags:
488             mutter(text)
489
490     def _request_path(self, relpath):
491         if self._backing_url == self.svn_url:
492             return relpath.strip("/")
493         newsvnurl = urlutils.join(self.svn_url, relpath)
494         if newsvnurl == self._backing_url:
495             return ""
496         newrelpath = urlutils.relative_url(self._backing_url+"/", newsvnurl+"/").strip("/")
497         self.mutter('request path %r -> %r' % (relpath, newrelpath))
498         return newrelpath
499
500     def list_dir(self, relpath):
501         assert len(relpath) == 0 or relpath[0] != "/"
502         if relpath == ".":
503             relpath = ""
504         try:
505             (dirents, _, _) = self.get_dir(relpath, self.get_latest_revnum())
506         except SubversionException, (msg, num):
507             if num == constants.ERR_FS_NOT_DIRECTORY:
508                 raise NoSuchFile(relpath)
509             raise
510         return dirents.keys()
511
512     def check_path(self, path, revnum):
513         path = self._request_path(path)
514         conn = self.get_connection()
515         try:
516             return conn.check_path(path, revnum)
517         finally:
518             self.add_connection(conn)
519
520     def mkdir(self, relpath, mode=None):
521         conn = self.get_connection()
522         try:
523             return conn.mkdir(relpath, mode)
524         finally:
525             self.add_connection(conn)
526
527     def replay(self, revision, low_water_mark, send_deltas, editor, pool=None):
528         conn = self._open_real_transport()
529         try:
530             return conn.replay(revision, low_water_mark, 
531                                              send_deltas, editor, pool)
532         finally:
533             self.add_connection(conn)
534
535     def do_update(self, revnum, recurse, editor, pool=None):
536         conn = self._open_real_transport()
537         conn.set_unbusy_handler(lambda: self.add_connection(conn))
538         return conn.do_update(revnum, recurse, editor, pool)
539
540     def has_capability(self, cap):
541         conn = self.get_connection()
542         try:
543             return conn.has_capability(cap)
544         finally:
545             self.add_connection(conn)
546
547     def revprop_list(self, revnum):
548         conn = self.get_connection()
549         try:
550             return conn.revprop_list(revnum)
551         finally:
552             self.add_connection(conn)
553
554     def get_commit_editor(self, revprops, done_cb, lock_token, keep_locks):
555         conn = self._open_real_transport()
556         conn.set_unbusy_handler(lambda: self.add_connection(conn))
557         return conn.get_commit_editor(revprops, done_cb,
558                                      lock_token, keep_locks)
559
560     def listable(self):
561         """See Transport.listable().
562         """
563         return True
564
565     # There is no real way to do locking directly on the transport 
566     # nor is there a need to as the remote server will take care of 
567     # locking
568     class PhonyLock(object):
569         def unlock(self):
570             pass
571
572     def lock_read(self, relpath):
573         """See Transport.lock_read()."""
574         return self.PhonyLock()
575
576     def lock_write(self, path_revs, comment=None, steal_lock=False):
577         return self.PhonyLock() # FIXME
578
579     def _is_http_transport(self):
580         return (self.svn_url.startswith("http://") or 
581                 self.svn_url.startswith("https://"))
582
583     def clone_root(self):
584         if self._is_http_transport():
585             return SvnRaTransport(self.get_repos_root(), 
586                                   bzr_to_svn_url(self.base),
587                                   pool=self.connections)
588         return SvnRaTransport(self.get_repos_root(),
589                               pool=self.connections)
590
591     def clone(self, offset=None):
592         """See Transport.clone()."""
593         if offset is None:
594             return SvnRaTransport(self.base, pool=self.connections)
595
596         return SvnRaTransport(urlutils.join(self.base, offset), pool=self.connections)
597
598     def local_abspath(self, relpath):
599         """See Transport.local_abspath()."""
600         absurl = self.abspath(relpath)
601         if self.base.startswith("file:///"):
602             return urlutils.local_path_from_url(absurl)
603         raise NotLocalUrl(absurl)
604
605     def abspath(self, relpath):
606         """See Transport.abspath()."""
607         return urlutils.join(self.base, relpath)