# server.py -- Implementation of the server side git protocols
-# Copryight (C) 2008 John Carr <john.carr@unrouted.co.uk>
+# Copyright (C) 2008 John Carr <john.carr@unrouted.co.uk>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
-
"""Git smart network protocol server implementation.
For more detailed implementation on the network protocol, see the
Documentation/technical directory in the cgit distribution, and in particular:
- Documentation/technical/protocol-capabilities.txt
- Documentation/technical/pack-protocol.txt
+
+* Documentation/technical/protocol-capabilities.txt
+* Documentation/technical/pack-protocol.txt
"""
import collections
+import socket
import SocketServer
-import tempfile
+import sys
+import zlib
from dulwich.errors import (
ApplyDeltaError,
ChecksumMismatch,
GitProtocolError,
+ UnexpectedCommandError,
+ ObjectFormatException,
)
+from dulwich import log_utils
from dulwich.objects import (
hex_to_sha,
)
+from dulwich.pack import (
+ PackStreamReader,
+ write_pack_data,
+ )
from dulwich.protocol import (
- Protocol,
+ MULTI_ACK,
+ MULTI_ACK_DETAILED,
ProtocolFile,
+ ReceivableProtocol,
+ SINGLE_ACK,
TCP_GIT_PORT,
+ ZERO_SHA,
+ ack_type,
extract_capabilities,
extract_want_line_capabilities,
- SINGLE_ACK,
- MULTI_ACK,
- MULTI_ACK_DETAILED,
- ack_type,
+ BufferedPktLineWriter,
)
from dulwich.repo import (
Repo,
)
-from dulwich.pack import (
- write_pack_data,
- )
+
+
+logger = log_utils.getLogger(__name__)
+
class Backend(object):
+ """A backend for the Git smart server implementation."""
+
+ def open_repository(self, path):
+ """Open the repository at a path."""
+ raise NotImplementedError(self.open_repository)
+
+
+class BackendRepo(object):
+ """Repository abstraction used by the Git server.
+
+ Please note that the methods required here are a
+ subset of those provided by dulwich.repo.Repo.
+ """
+
+ object_store = None
+ refs = None
def get_refs(self):
"""
"""
raise NotImplementedError
- def apply_pack(self, refs, read):
- """ Import a set of changes into a repository and update the refs
+ def get_peeled(self, name):
+ """Return the cached peeled value of a ref, if available.
- :param refs: list of tuple(name, sha)
- :param read: callback to read from the incoming pack
+ :param name: Name of the ref to peel
+ :return: The peeled value of the ref. If the ref is known not point to
+ a tag, this will be the SHA the ref refers to. If no cached
+ information about a tag is available, this method may return None,
+ but it should attempt to peel the tag if possible.
"""
- raise NotImplementedError
+ return None
- def fetch_objects(self, determine_wants, graph_walker, progress):
+ def fetch_objects(self, determine_wants, graph_walker, progress,
+ get_tagged=None):
"""
Yield the objects required for a list of commits.
:param progress: is a callback to send progress messages to the client
+ :param get_tagged: Function that returns a dict of pointed-to sha -> tag
+ sha for including tags.
"""
raise NotImplementedError
-class GitBackend(Backend):
+class PackStreamCopier(PackStreamReader):
+ """Class to verify a pack stream as it is being read.
- def __init__(self, repo=None):
- if repo is None:
- repo = Repo(tmpfile.mkdtemp())
- self.repo = repo
- self.object_store = self.repo.object_store
- self.fetch_objects = self.repo.fetch_objects
- self.get_refs = self.repo.get_refs
+ The pack is read from a ReceivableProtocol using read() or recv() as
+ appropriate and written out to the given file-like object.
+ """
- def apply_pack(self, refs, read):
- f, commit = self.repo.object_store.add_thin_pack()
- all_exceptions = (IOError, OSError, ChecksumMismatch, ApplyDeltaError)
- status = []
- unpack_error = None
- # TODO: more informative error messages than just the exception string
- try:
- # TODO: decode the pack as we stream to avoid blocking reads beyond
- # the end of data (when using HTTP/1.1 chunked encoding)
- while True:
- data = read(10240)
- if not data:
- break
- f.write(data)
- except all_exceptions, e:
- unpack_error = str(e).replace('\n', '')
- try:
- commit()
- except all_exceptions, e:
- if not unpack_error:
- unpack_error = str(e).replace('\n', '')
+ def __init__(self, read_all, read_some, outfile):
+ super(PackStreamCopier, self).__init__(read_all, read_some)
+ self.outfile = outfile
- if unpack_error:
- status.append(('unpack', unpack_error))
- else:
- status.append(('unpack', 'ok'))
+ def _read(self, read, size):
+ data = super(PackStreamCopier, self)._read(read, size)
+ self.outfile.write(data)
+ return data
- for oldsha, sha, ref in refs:
- # TODO: check refname
- ref_error = None
- try:
- if ref == "0" * 40:
- try:
- del self.repo.refs[ref]
- except all_exceptions:
- ref_error = 'failed to delete'
- else:
- try:
- self.repo.refs[ref] = sha
- except all_exceptions:
- ref_error = 'failed to write'
- except KeyError, e:
- ref_error = 'bad ref'
- if ref_error:
- status.append((ref, ref_error))
- else:
- status.append((ref, 'ok'))
+ def verify(self):
+ """Verify a pack stream and write it to the output file.
+ See PackStreamReader.iterobjects for a list of exceptions this may
+ throw.
+ """
+ for _, _, _ in self.read_objects():
+ pass
- print "pack applied"
- return status
+
+class DictBackend(Backend):
+ """Trivial backend that looks up Git repositories in a dictionary."""
+
+ def __init__(self, repos):
+ self.repos = repos
+
+ def open_repository(self, path):
+ logger.debug('Opening repository at %s', path)
+ # FIXME: What to do in case there is no repo ?
+ return self.repos[path]
class Handler(object):
"""Smart protocol command handler base class."""
- def __init__(self, backend, read, write):
+ def __init__(self, backend, proto):
self.backend = backend
- self.proto = Protocol(read, write)
+ self.proto = proto
self._client_capabilities = None
- def capability_line(self):
- return " ".join(self.capabilities())
+ @classmethod
+ def capability_line(cls):
+ return " ".join(cls.capabilities())
+
+ @classmethod
+ def capabilities(cls):
+ raise NotImplementedError(cls.capabilities)
- def capabilities(self):
- raise NotImplementedError(self.capabilities)
+ @classmethod
+ def innocuous_capabilities(cls):
+ return ("include-tag", "thin-pack", "no-progress", "ofs-delta")
+
+ @classmethod
+ def required_capabilities(cls):
+ """Return a list of capabilities that we require the client to have."""
+ return []
def set_client_capabilities(self, caps):
- my_caps = self.capabilities()
+ allowable_caps = set(self.innocuous_capabilities())
+ allowable_caps.update(self.capabilities())
for cap in caps:
- if cap not in my_caps:
+ if cap not in allowable_caps:
raise GitProtocolError('Client asked for capability %s that '
'was not advertised.' % cap)
+ for cap in self.required_capabilities():
+ if cap not in caps:
+ raise GitProtocolError('Client does not support required '
+ 'capability %s.' % cap)
self._client_capabilities = set(caps)
+ logger.info('Client capabilities: %s', caps)
def has_capability(self, cap):
if self._client_capabilities is None:
class UploadPackHandler(Handler):
"""Protocol handler for uploading a pack to the server."""
- def __init__(self, backend, read, write,
+ def __init__(self, backend, args, proto,
stateless_rpc=False, advertise_refs=False):
- Handler.__init__(self, backend, read, write)
+ Handler.__init__(self, backend, proto)
+ self.repo = backend.open_repository(args[0])
self._graph_walker = None
self.stateless_rpc = stateless_rpc
self.advertise_refs = advertise_refs
- def capabilities(self):
+ @classmethod
+ def capabilities(cls):
return ("multi_ack_detailed", "multi_ack", "side-band-64k", "thin-pack",
- "ofs-delta", "no-progress")
+ "ofs-delta", "no-progress", "include-tag")
+
+ @classmethod
+ def required_capabilities(cls):
+ return ("side-band-64k", "thin-pack", "ofs-delta")
def progress(self, message):
if self.has_capability("no-progress"):
return
self.proto.write_sideband(2, message)
+ def get_tagged(self, refs=None, repo=None):
+ """Get a dict of peeled values of tags to their original tag shas.
+
+ :param refs: dict of refname -> sha of possible tags; defaults to all of
+ the backend's refs.
+ :param repo: optional Repo instance for getting peeled refs; defaults to
+ the backend's repo, if available
+ :return: dict of peeled_sha -> tag_sha, where tag_sha is the sha of a
+ tag whose peeled value is peeled_sha.
+ """
+ if not self.has_capability("include-tag"):
+ return {}
+ if refs is None:
+ refs = self.repo.get_refs()
+ if repo is None:
+ repo = getattr(self.repo, "repo", None)
+ if repo is None:
+ # Bail if we don't have a Repo available; this is ok since
+ # clients must be able to handle if the server doesn't include
+ # all relevant tags.
+ # TODO: fix behavior when missing
+ return {}
+ tagged = {}
+ for name, sha in refs.iteritems():
+ peeled_sha = repo.get_peeled(name)
+ if peeled_sha != sha:
+ tagged[peeled_sha] = sha
+ return tagged
+
def handle(self):
write = lambda x: self.proto.write_sideband(1, x)
- graph_walker = ProtocolGraphWalker(self)
- objects_iter = self.backend.fetch_objects(
- graph_walker.determine_wants, graph_walker, self.progress)
+ graph_walker = ProtocolGraphWalker(self, self.repo.object_store,
+ self.repo.get_peeled)
+ objects_iter = self.repo.fetch_objects(
+ graph_walker.determine_wants, graph_walker, self.progress,
+ get_tagged=self.get_tagged)
# Do they want any objects?
if len(objects_iter) == 0:
self.proto.write("0000")
+def _split_proto_line(line, allowed):
+ """Split a line read from the wire.
+
+ :param line: The line read from the wire.
+ :param allowed: An iterable of command names that should be allowed.
+ Command names not listed below as possible return values will be
+ ignored. If None, any commands from the possible return values are
+ allowed.
+ :return: a tuple having one of the following forms:
+ ('want', obj_id)
+ ('have', obj_id)
+ ('done', None)
+ (None, None) (for a flush-pkt)
+
+ :raise UnexpectedCommandError: if the line cannot be parsed into one of the
+ allowed return values.
+ """
+ if not line:
+ fields = [None]
+ else:
+ fields = line.rstrip('\n').split(' ', 1)
+ command = fields[0]
+ if allowed is not None and command not in allowed:
+ raise UnexpectedCommandError(command)
+ try:
+ if len(fields) == 1 and command in ('done', None):
+ return (command, None)
+ elif len(fields) == 2 and command in ('want', 'have'):
+ hex_to_sha(fields[1])
+ return tuple(fields)
+ except (TypeError, AssertionError), e:
+ raise GitProtocolError(e)
+ raise GitProtocolError('Received invalid line from client: %s' % line)
+
+
class ProtocolGraphWalker(object):
"""A graph walker that knows the git protocol.
- As a graph walker, this class implements ack(), next(), and reset(). It also
- contains some base methods for interacting with the wire and walking the
- commit tree.
+ As a graph walker, this class implements ack(), next(), and reset(). It
+ also contains some base methods for interacting with the wire and walking
+ the commit tree.
The work of determining which acks to send is passed on to the
implementation instance stored in _impl. The reason for this is that we do
call to set_ack_level() is required to set up the implementation, before any
calls to next() or ack() are made.
"""
- def __init__(self, handler):
+ def __init__(self, handler, object_store, get_peeled):
self.handler = handler
- self.store = handler.backend.object_store
+ self.store = object_store
+ self.get_peeled = get_peeled
self.proto = handler.proto
self.stateless_rpc = handler.stateless_rpc
self.advertise_refs = handler.advertise_refs
if not i:
line = "%s\x00%s" % (line, self.handler.capability_line())
self.proto.write_pkt_line("%s\n" % line)
- # TODO: include peeled value of any tags
+ peeled_sha = self.get_peeled(ref)
+ if peeled_sha != sha:
+ self.proto.write_pkt_line('%s %s^{}\n' %
+ (peeled_sha, ref))
# i'm done..
self.proto.write_pkt_line(None)
line, caps = extract_want_line_capabilities(want)
self.handler.set_client_capabilities(caps)
self.set_ack_type(ack_type(caps))
- command, sha = self._split_proto_line(line)
+ allowed = ('want', None)
+ command, sha = _split_proto_line(line, allowed)
want_revs = []
while command != None:
- if command != 'want':
- raise GitProtocolError(
- 'Protocol got unexpected command %s' % command)
if sha not in values:
raise GitProtocolError(
- 'Client wants invalid object %s' % sha)
+ 'Client wants invalid object %s' % sha)
want_revs.append(sha)
- command, sha = self.read_proto_line()
+ command, sha = self.read_proto_line(allowed)
self.set_wants(want_revs)
return want_revs
return None
return self._cache[self._cache_index]
- def _split_proto_line(self, line):
- fields = line.rstrip('\n').split(' ', 1)
- if len(fields) == 1 and fields[0] == 'done':
- return ('done', None)
- elif len(fields) == 2 and fields[0] in ('want', 'have'):
- try:
- hex_to_sha(fields[1])
- return tuple(fields)
- except (TypeError, AssertionError), e:
- raise GitProtocolError(e)
- raise GitProtocolError('Received invalid line from client:\n%s' % line)
-
- def read_proto_line(self):
+ def read_proto_line(self, allowed):
"""Read a line from the wire.
- :return: a tuple having one of the following forms:
- ('want', obj_id)
- ('have', obj_id)
- ('done', None)
- (None, None) (for a flush-pkt)
-
- :raise GitProtocolError: if the line cannot be parsed into one of the
- possible return values.
+ :param allowed: An iterable of command names that should be allowed.
+ :return: A tuple of (command, value); see _split_proto_line.
+ :raise GitProtocolError: If an error occurred reading the line.
"""
- line = self.proto.read_pkt_line()
- if not line:
- return (None, None)
- return self._split_proto_line(line)
+ return _split_proto_line(self.proto.read_pkt_line(), allowed)
def send_ack(self, sha, ack_type=''):
if ack_type:
commit = pending.popleft()
if commit.id in haves:
return True
- if not getattr(commit, 'get_parents', None):
+ if commit.type_name != "commit":
# non-commit wants are assumed to be satisfied
continue
- for parent in commit.get_parents():
+ for parent in commit.parents:
parent_obj = self.store[parent]
# TODO: handle parents with later commit times than children
if parent_obj.commit_time >= earliest:
def set_ack_type(self, ack_type):
impl_classes = {
- MULTI_ACK: MultiAckGraphWalkerImpl,
- MULTI_ACK_DETAILED: MultiAckDetailedGraphWalkerImpl,
- SINGLE_ACK: SingleAckGraphWalkerImpl,
- }
+ MULTI_ACK: MultiAckGraphWalkerImpl,
+ MULTI_ACK_DETAILED: MultiAckDetailedGraphWalkerImpl,
+ SINGLE_ACK: SingleAckGraphWalkerImpl,
+ }
self._impl = impl_classes[ack_type](self)
+_GRAPH_WALKER_COMMANDS = ('have', 'done', None)
+
+
class SingleAckGraphWalkerImpl(object):
"""Graph walker implementation that speaks the single-ack protocol."""
self._sent_ack = True
def next(self):
- command, sha = self.walker.read_proto_line()
+ command, sha = self.walker.read_proto_line(_GRAPH_WALKER_COMMANDS)
if command in (None, 'done'):
if not self._sent_ack:
self.walker.send_nak()
def next(self):
while True:
- command, sha = self.walker.read_proto_line()
+ command, sha = self.walker.read_proto_line(_GRAPH_WALKER_COMMANDS)
if command is None:
self.walker.send_nak()
# in multi-ack mode, a flush-pkt indicates the client wants to
def next(self):
while True:
- command, sha = self.walker.read_proto_line()
+ command, sha = self.walker.read_proto_line(_GRAPH_WALKER_COMMANDS)
if command is None:
self.walker.send_nak()
if self.walker.stateless_rpc:
class ReceivePackHandler(Handler):
"""Protocol handler for downloading a pack from the client."""
- def __init__(self, backend, read, write,
+ def __init__(self, backend, args, proto,
stateless_rpc=False, advertise_refs=False):
- Handler.__init__(self, backend, read, write)
+ Handler.__init__(self, backend, proto)
+ self.repo = backend.open_repository(args[0])
self.stateless_rpc = stateless_rpc
self.advertise_refs = advertise_refs
- def capabilities(self):
- return ("report-status", "delete-refs")
+ @classmethod
+ def capabilities(cls):
+ return ("report-status", "delete-refs", "side-band-64k")
+
+ def _apply_pack(self, refs):
+ f, commit = self.repo.object_store.add_thin_pack()
+ all_exceptions = (IOError, OSError, ChecksumMismatch, ApplyDeltaError,
+ AssertionError, socket.error, zlib.error,
+ ObjectFormatException)
+ status = []
+ # TODO: more informative error messages than just the exception string
+ try:
+ PackStreamCopier(self.proto.read, self.proto.recv, f).verify()
+ p = commit()
+ if not p:
+ raise IOError('Failed to write pack')
+ p.check()
+ status.append(('unpack', 'ok'))
+ except all_exceptions, e:
+ status.append(('unpack', str(e).replace('\n', '')))
+ # The pack may still have been moved in, but it may contain broken
+ # objects. We trust a later GC to clean it up.
+
+ for oldsha, sha, ref in refs:
+ ref_status = 'ok'
+ try:
+ if sha == ZERO_SHA:
+ if not 'delete-refs' in self.capabilities():
+ raise GitProtocolError(
+ 'Attempted to delete refs without delete-refs '
+ 'capability.')
+ try:
+ del self.repo.refs[ref]
+ except all_exceptions:
+ ref_status = 'failed to delete'
+ else:
+ try:
+ self.repo.refs[ref] = sha
+ except all_exceptions:
+ ref_status = 'failed to write'
+ except KeyError, e:
+ ref_status = 'bad ref'
+ status.append((ref, ref_status))
+
+ return status
+
+ def _report_status(self, status):
+ if self.has_capability('side-band-64k'):
+ writer = BufferedPktLineWriter(
+ lambda d: self.proto.write_sideband(1, d))
+ write = writer.write
+
+ def flush():
+ writer.flush()
+ self.proto.write_pkt_line(None)
+ else:
+ write = self.proto.write_pkt_line
+ flush = lambda: None
+
+ for name, msg in status:
+ if name == 'unpack':
+ write('unpack %s\n' % msg)
+ elif msg == 'ok':
+ write('ok %s\n' % name)
+ else:
+ write('ng %s %s\n' % (name, msg))
+ write(None)
+ flush()
def handle(self):
- refs = self.backend.get_refs().items()
+ refs = self.repo.get_refs().items()
if self.advertise_refs or not self.stateless_rpc:
if refs:
self.proto.write_pkt_line(
- "%s %s\x00%s\n" % (refs[0][1], refs[0][0],
- self.capability_line()))
+ "%s %s\x00%s\n" % (refs[0][1], refs[0][0],
+ self.capability_line()))
for i in range(1, len(refs)):
ref = refs[i]
self.proto.write_pkt_line("%s %s\n" % (ref[1], ref[0]))
else:
- self.proto.write_pkt_line("0000000000000000000000000000000000000000 capabilities^{} %s" % self.capability_line())
+ self.proto.write_pkt_line("%s capabilities^{} %s" % (
+ ZERO_SHA, self.capability_line()))
self.proto.write("0000")
if self.advertise_refs:
ref = self.proto.read_pkt_line()
# backend can now deal with this refs and read a pack using self.read
- status = self.backend.apply_pack(client_refs, self.proto.read)
+ status = self._apply_pack(client_refs)
# when we have read all the pack from the client, send a status report
# if the client asked for it
if self.has_capability('report-status'):
- for name, msg in status:
- if name == 'unpack':
- self.proto.write_pkt_line('unpack %s\n' % msg)
- elif msg == 'ok':
- self.proto.write_pkt_line('ok %s\n' % name)
- else:
- self.proto.write_pkt_line('ng %s %s\n' % (name, msg))
- self.proto.write_pkt_line(None)
+ self._report_status(status)
+
+
+# Default handler classes for git services.
+DEFAULT_HANDLERS = {
+ 'git-upload-pack': UploadPackHandler,
+ 'git-receive-pack': ReceivePackHandler,
+ }
class TCPGitRequestHandler(SocketServer.StreamRequestHandler):
+ def __init__(self, handlers, *args, **kwargs):
+ self.handlers = handlers
+ SocketServer.StreamRequestHandler.__init__(self, *args, **kwargs)
+
def handle(self):
- proto = Protocol(self.rfile.read, self.wfile.write)
+ proto = ReceivableProtocol(self.connection.recv, self.wfile.write)
command, args = proto.read_cmd()
+ logger.info('Handling %s request, args=%s', command, args)
- # switch case to handle the specific git command
- if command == 'git-upload-pack':
- cls = UploadPackHandler
- elif command == 'git-receive-pack':
- cls = ReceivePackHandler
- else:
- return
-
- h = cls(self.server.backend, self.rfile.read, self.wfile.write)
+ cls = self.handlers.get(command, None)
+ if not callable(cls):
+ raise GitProtocolError('Invalid service %s' % command)
+ h = cls(self.server.backend, args, proto)
h.handle()
allow_reuse_address = True
serve = SocketServer.TCPServer.serve_forever
- def __init__(self, backend, listen_addr, port=TCP_GIT_PORT):
+ def _make_handler(self, *args, **kwargs):
+ return TCPGitRequestHandler(self.handlers, *args, **kwargs)
+
+ def __init__(self, backend, listen_addr, port=TCP_GIT_PORT, handlers=None):
+ self.handlers = dict(DEFAULT_HANDLERS)
+ if handlers is not None:
+ self.handlers.update(handlers)
self.backend = backend
- SocketServer.TCPServer.__init__(self, (listen_addr, port), TCPGitRequestHandler)
+ logger.info('Listening for TCP connections on %s:%d', listen_addr, port)
+ SocketServer.TCPServer.__init__(self, (listen_addr, port),
+ self._make_handler)
+
+ def verify_request(self, request, client_address):
+ logger.info('Handling request from %s', client_address)
+ return True
+
+ def handle_error(self, request, client_address):
+ logger.exception('Exception happened during processing of request '
+ 'from %s', client_address)
+
+
+def main(argv=sys.argv):
+ """Entry point for starting a TCP git server."""
+ if len(argv) > 1:
+ gitdir = argv[1]
+ else:
+ gitdir = '.'
+
+ log_utils.default_logging_config()
+ backend = DictBackend({'/': Repo(gitdir)})
+ server = TCPGitServer(backend, 'localhost')
+ server.serve_forever()