# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
-# of the License.
+# or (at your option) any later version of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
+
+"""Git smart network protocol server implementation.
+
+For more detailed implementation on the network protocol, see the
+Documentation/technical directory in the cgit distribution, and in particular:
+ Documentation/technical/protocol-capabilities.txt
+ Documentation/technical/pack-protocol.txt
+"""
+
+
+import collections
import SocketServer
+import tempfile
+
+from dulwich.errors import (
+ ApplyDeltaError,
+ ChecksumMismatch,
+ GitProtocolError,
+ )
+from dulwich.objects import (
+ hex_to_sha,
+ )
+from dulwich.protocol import (
+ Protocol,
+ ProtocolFile,
+ TCP_GIT_PORT,
+ extract_capabilities,
+ extract_want_line_capabilities,
+ SINGLE_ACK,
+ MULTI_ACK,
+ MULTI_ACK_DETAILED,
+ ack_type,
+ )
+from dulwich.repo import (
+ Repo,
+ )
+from dulwich.pack import (
+ write_pack_data,
+ )
class Backend(object):
"""
Get all the refs in the repository
- :return: list of tuple(name, sha)
- """
- raise NotImplementedError
-
- def has_revision(self, sha):
- """
- Is a given sha in this repository?
-
- :return: True or False
+ :return: dict of name -> sha
"""
raise NotImplementedError
"""
raise NotImplementedError
- def generate_pack(self, want, have, write, progress):
+ def fetch_objects(self, determine_wants, graph_walker, progress):
"""
- Generate a pack containing all commits a client is missing
+ Yield the objects required for a list of commits.
- :param want: is a list of sha's the client desires
- :param have: is a list of sha's the client has (allowing us to send the minimal pack)
- :param write: is a callback to write pack data to the client
:param progress: is a callback to send progress messages to the client
"""
raise NotImplementedError
+class GitBackend(Backend):
+
+ def __init__(self, repo=None):
+ if repo is None:
+ repo = Repo(tmpfile.mkdtemp())
+ self.repo = repo
+ self.object_store = self.repo.object_store
+ self.fetch_objects = self.repo.fetch_objects
+ self.get_refs = self.repo.get_refs
+
+ def apply_pack(self, refs, read):
+ f, commit = self.repo.object_store.add_thin_pack()
+ all_exceptions = (IOError, OSError, ChecksumMismatch, ApplyDeltaError)
+ status = []
+ unpack_error = None
+ # TODO: more informative error messages than just the exception string
+ try:
+ # TODO: decode the pack as we stream to avoid blocking reads beyond
+ # the end of data (when using HTTP/1.1 chunked encoding)
+ while True:
+ data = read(10240)
+ if not data:
+ break
+ f.write(data)
+ except all_exceptions, e:
+ unpack_error = str(e).replace('\n', '')
+ try:
+ commit()
+ except all_exceptions, e:
+ if not unpack_error:
+ unpack_error = str(e).replace('\n', '')
+
+ if unpack_error:
+ status.append(('unpack', unpack_error))
+ else:
+ status.append(('unpack', 'ok'))
+
+ for oldsha, sha, ref in refs:
+ # TODO: check refname
+ ref_error = None
+ try:
+ if ref == "0" * 40:
+ try:
+ del self.repo.refs[ref]
+ except all_exceptions:
+ ref_error = 'failed to delete'
+ else:
+ try:
+ self.repo.refs[ref] = sha
+ except all_exceptions:
+ ref_error = 'failed to write'
+ except KeyError, e:
+ ref_error = 'bad ref'
+ if ref_error:
+ status.append((ref, ref_error))
+ else:
+ status.append((ref, 'ok'))
+
+
+ print "pack applied"
+ return status
+
+
class Handler(object):
+ """Smart protocol command handler base class."""
def __init__(self, backend, read, write):
self.backend = backend
- self.read = read
- self.write = write
+ self.proto = Protocol(read, write)
- def read_pkt_line(self):
- """
- Reads a 'pkt line' from the remote git process
+ def capabilities(self):
+ return " ".join(self.default_capabilities())
- :return: The next string from the stream
- """
- sizestr = self.read(4)
- if not sizestr:
- return None
- size = int(sizestr, 16)
- if size == 0:
- return None
- return self.read(size-4)
- def write_pkt_line(self, line):
- """
- Sends a 'pkt line' to the remote git process
+class UploadPackHandler(Handler):
+ """Protocol handler for uploading a pack to the server."""
+
+ def __init__(self, backend, read, write,
+ stateless_rpc=False, advertise_refs=False):
+ Handler.__init__(self, backend, read, write)
+ self._client_capabilities = None
+ self._graph_walker = None
+ self.stateless_rpc = stateless_rpc
+ self.advertise_refs = advertise_refs
+
+ def default_capabilities(self):
+ return ("multi_ack_detailed", "multi_ack", "side-band-64k", "thin-pack",
+ "ofs-delta")
+
+ def set_client_capabilities(self, caps):
+ my_caps = self.default_capabilities()
+ for cap in caps:
+ if '_ack' in cap and cap not in my_caps:
+ raise GitProtocolError('Client asked for capability %s that '
+ 'was not advertised.' % cap)
+ self._client_capabilities = caps
+
+ def get_client_capabilities(self):
+ return self._client_capabilities
+
+ client_capabilities = property(get_client_capabilities,
+ set_client_capabilities)
- :param line: A string containing the data to send
- """
- self.write("%04x%s" % (len(line)+4, line))
+ def handle(self):
- def write_sideband(self, channel, blob):
- """
- Write data to the sideband (a git multiplexing method)
+ progress = lambda x: self.proto.write_sideband(2, x)
+ write = lambda x: self.proto.write_sideband(1, x)
- :param channel: int specifying which channel to write to
- :param blob: a blob of data (as a string) to send on this channel
- """
- # a pktline can be a max of 65535. a sideband line can therefore be
- # 65535-5 = 65530
- # WTF: Why have the len in ASCII, but the channel in binary.
- while blob:
- self.write_pkt_line("%s%s" % (chr(channel), blob[:65530]))
- blob = blob[65530:]
+ graph_walker = ProtocolGraphWalker(self)
+ objects_iter = self.backend.fetch_objects(
+ graph_walker.determine_wants, graph_walker, progress)
- def handle(self):
- """
- Deal with the request
+ # Do they want any objects?
+ if len(objects_iter) == 0:
+ return
+
+ progress("dul-daemon says what\n")
+ progress("counting objects: %d, done.\n" % len(objects_iter))
+ write_pack_data(ProtocolFile(None, write), objects_iter,
+ len(objects_iter))
+ progress("how was that, then?\n")
+ # we are done
+ self.proto.write("0000")
+
+
+class ProtocolGraphWalker(object):
+ """A graph walker that knows the git protocol.
+
+ As a graph walker, this class implements ack(), next(), and reset(). It also
+ contains some base methods for interacting with the wire and walking the
+ commit tree.
+
+ The work of determining which acks to send is passed on to the
+ implementation instance stored in _impl. The reason for this is that we do
+ not know at object creation time what ack level the protocol requires. A
+ call to set_ack_level() is required to set up the implementation, before any
+ calls to next() or ack() are made.
+ """
+ def __init__(self, handler):
+ self.handler = handler
+ self.store = handler.backend.object_store
+ self.proto = handler.proto
+ self.stateless_rpc = handler.stateless_rpc
+ self.advertise_refs = handler.advertise_refs
+ self._wants = []
+ self._cached = False
+ self._cache = []
+ self._cache_index = 0
+ self._impl = None
+
+ def determine_wants(self, heads):
+ """Determine the wants for a set of heads.
+
+ The given heads are advertised to the client, who then specifies which
+ refs he wants using 'want' lines. This portion of the protocol is the
+ same regardless of ack type, and in fact is used to set the ack type of
+ the ProtocolGraphWalker.
+
+ :param heads: a dict of refname->SHA1 to advertise
+ :return: a list of SHA1s requested by the client
"""
- raise NotImplementedError
+ if not heads:
+ raise GitProtocolError('No heads found')
+ values = set(heads.itervalues())
+ if self.advertise_refs or not self.stateless_rpc:
+ for i, (ref, sha) in enumerate(heads.iteritems()):
+ line = "%s %s" % (sha, ref)
+ if not i:
+ line = "%s\x00%s" % (line, self.handler.capabilities())
+ self.proto.write_pkt_line("%s\n" % line)
+ # TODO: include peeled value of any tags
+
+ # i'm done..
+ self.proto.write_pkt_line(None)
+
+ if self.advertise_refs:
+ return []
+
+ # Now client will sending want want want commands
+ want = self.proto.read_pkt_line()
+ if not want:
+ return []
+ line, caps = extract_want_line_capabilities(want)
+ self.handler.client_capabilities = caps
+ self.set_ack_type(ack_type(caps))
+ command, sha = self._split_proto_line(line)
+ want_revs = []
+ while command != None:
+ if command != 'want':
+ raise GitProtocolError(
+ 'Protocol got unexpected command %s' % command)
+ if sha not in values:
+ raise GitProtocolError(
+ 'Client wants invalid object %s' % sha)
+ want_revs.append(sha)
+ command, sha = self.read_proto_line()
+
+ self.set_wants(want_revs)
+ return want_revs
+
+ def ack(self, have_ref):
+ return self._impl.ack(have_ref)
+
+ def reset(self):
+ self._cached = True
+ self._cache_index = 0
+
+ def next(self):
+ if not self._cached:
+ if not self._impl and self.stateless_rpc:
+ return None
+ return self._impl.next()
+ self._cache_index += 1
+ if self._cache_index > len(self._cache):
+ return None
+ return self._cache[self._cache_index]
+
+ def _split_proto_line(self, line):
+ fields = line.rstrip('\n').split(' ', 1)
+ if len(fields) == 1 and fields[0] == 'done':
+ return ('done', None)
+ elif len(fields) == 2 and fields[0] in ('want', 'have'):
+ try:
+ hex_to_sha(fields[1])
+ return tuple(fields)
+ except (TypeError, AssertionError), e:
+ raise GitProtocolError(e)
+ raise GitProtocolError('Received invalid line from client:\n%s' % line)
+
+ def read_proto_line(self):
+ """Read a line from the wire.
+
+ :return: a tuple having one of the following forms:
+ ('want', obj_id)
+ ('have', obj_id)
+ ('done', None)
+ (None, None) (for a flush-pkt)
+
+ :raise GitProtocolError: if the line cannot be parsed into one of the
+ possible return values.
+ """
+ line = self.proto.read_pkt_line()
+ if not line:
+ return (None, None)
+ return self._split_proto_line(line)
-class UploadPackHandler(Handler):
+ def send_ack(self, sha, ack_type=''):
+ if ack_type:
+ ack_type = ' %s' % ack_type
+ self.proto.write_pkt_line('ACK %s%s\n' % (sha, ack_type))
- def handle(self):
- refs = self.backend.get_refs()
+ def send_nak(self):
+ self.proto.write_pkt_line('NAK\n')
- if refs:
- self.write_pkt_line("%s %s\x00multi_ack side-band-64k thin-pack ofs-delta\n" % (refs[0][1], refs[0][0]))
- for i in range(1, len(refs)):
- ref = refs[i]
- self.write_pkt_line("%s %s\n" % (ref[1], ref[0]))
+ def set_wants(self, wants):
+ self._wants = wants
- # i'm done...
- self.write("0000")
+ def _is_satisfied(self, haves, want, earliest):
+ """Check whether a want is satisfied by a set of haves.
- # Now client will either send "0000", meaning that it doesnt want to pull.
- # or it will start sending want want want commands
- want = self.read_pkt_line()
- if want == None:
- return
-
- # Keep reading the list of demands until we hit another "0000"
- want_revs = []
- while want and want[:4] == 'want':
- want_rev = want[5:45]
- # FIXME: This check probably isnt needed?
- if self.backend.has_revision(want_rev):
- want_revs.append(want_rev)
- want = self.read_pkt_line()
-
- # Client will now tell us which commits it already has - if we have them we ACK them
- # this allows client to stop looking at that commits parents (main reason why git pull is fast)
- last_sha = None
- have_revs = []
- have = self.read_pkt_line()
- while have and have[:4] == 'have':
- have_ref = have[6:46]
- if self.backend.has_revision(hav_rev):
- self.write_pkt_line("ACK %s continue\n" % sha)
- last_sha = sha
- have_revs.append(rev_id)
- have = self.read_pkt_line()
-
- # At some point client will stop sending commits and will tell us it is done
- assert(have[:4] == "done")
-
- # Oddness: Git seems to resend the last ACK, without the "continue" statement
- if last_sha:
- self.write_pkt_line("ACK %s\n" % last_sha)
-
- # The exchange finishes with a NAK
- self.write_pkt_line("NAK\n")
-
- self.backend.generate_pack(want_revs, have_revs, lambda x: self.write_sideband(1, x), lambda x: self.write_sideband(2, x))
+ A want, typically a branch tip, is "satisfied" only if there exists a
+ path back from that want to one of the haves.
- # we are done
- self.write("0000")
+ :param haves: A set of commits we know the client has.
+ :param want: The want to check satisfaction for.
+ :param earliest: A timestamp beyond which the search for haves will be
+ terminated, presumably because we're searching too far down the
+ wrong branch.
+ """
+ o = self.store[want]
+ pending = collections.deque([o])
+ while pending:
+ commit = pending.popleft()
+ if commit.id in haves:
+ return True
+ if not getattr(commit, 'get_parents', None):
+ # non-commit wants are assumed to be satisfied
+ continue
+ for parent in commit.get_parents():
+ parent_obj = self.store[parent]
+ # TODO: handle parents with later commit times than children
+ if parent_obj.commit_time >= earliest:
+ pending.append(parent_obj)
+ return False
+
+ def all_wants_satisfied(self, haves):
+ """Check whether all the current wants are satisfied by a set of haves.
+
+ :param haves: A set of commits we know the client has.
+ :note: Wants are specified with set_wants rather than passed in since
+ in the current interface they are determined outside this class.
+ """
+ haves = set(haves)
+ earliest = min([self.store[h].commit_time for h in haves])
+ for want in self._wants:
+ if not self._is_satisfied(haves, want, earliest):
+ return False
+ return True
+
+ def set_ack_type(self, ack_type):
+ impl_classes = {
+ MULTI_ACK: MultiAckGraphWalkerImpl,
+ MULTI_ACK_DETAILED: MultiAckDetailedGraphWalkerImpl,
+ SINGLE_ACK: SingleAckGraphWalkerImpl,
+ }
+ self._impl = impl_classes[ack_type](self)
+
+
+class SingleAckGraphWalkerImpl(object):
+ """Graph walker implementation that speaks the single-ack protocol."""
+
+ def __init__(self, walker):
+ self.walker = walker
+ self._sent_ack = False
+
+ def ack(self, have_ref):
+ if not self._sent_ack:
+ self.walker.send_ack(have_ref)
+ self._sent_ack = True
+
+ def next(self):
+ command, sha = self.walker.read_proto_line()
+ if command in (None, 'done'):
+ if not self._sent_ack:
+ self.walker.send_nak()
+ return None
+ elif command == 'have':
+ return sha
+
+
+class MultiAckGraphWalkerImpl(object):
+ """Graph walker implementation that speaks the multi-ack protocol."""
+
+ def __init__(self, walker):
+ self.walker = walker
+ self._found_base = False
+ self._common = []
+
+ def ack(self, have_ref):
+ self._common.append(have_ref)
+ if not self._found_base:
+ self.walker.send_ack(have_ref, 'continue')
+ if self.walker.all_wants_satisfied(self._common):
+ self._found_base = True
+ # else we blind ack within next
+
+ def next(self):
+ while True:
+ command, sha = self.walker.read_proto_line()
+ if command is None:
+ self.walker.send_nak()
+ # in multi-ack mode, a flush-pkt indicates the client wants to
+ # flush but more have lines are still coming
+ continue
+ elif command == 'done':
+ # don't nak unless no common commits were found, even if not
+ # everything is satisfied
+ if self._common:
+ self.walker.send_ack(self._common[-1])
+ else:
+ self.walker.send_nak()
+ return None
+ elif command == 'have':
+ if self._found_base:
+ # blind ack
+ self.walker.send_ack(sha, 'continue')
+ return sha
+
+
+class MultiAckDetailedGraphWalkerImpl(object):
+ """Graph walker implementation speaking the multi-ack-detailed protocol."""
+
+ def __init__(self, walker):
+ self.walker = walker
+ self._found_base = False
+ self._common = []
+
+ def ack(self, have_ref):
+ self._common.append(have_ref)
+ if not self._found_base:
+ self.walker.send_ack(have_ref, 'common')
+ if self.walker.all_wants_satisfied(self._common):
+ self._found_base = True
+ self.walker.send_ack(have_ref, 'ready')
+ # else we blind ack within next
+
+ def next(self):
+ while True:
+ command, sha = self.walker.read_proto_line()
+ if command is None:
+ self.walker.send_nak()
+ if self.walker.stateless_rpc:
+ return None
+ continue
+ elif command == 'done':
+ # don't nak unless no common commits were found, even if not
+ # everything is satisfied
+ if self._common:
+ self.walker.send_ack(self._common[-1])
+ else:
+ self.walker.send_nak()
+ return None
+ elif command == 'have':
+ if self._found_base:
+ # blind ack; can happen if the client has more requests
+ # inflight
+ self.walker.send_ack(sha, 'ready')
+ return sha
class ReceivePackHandler(Handler):
+ """Protocol handler for downloading a pack from the client."""
+
+ def __init__(self, backend, read, write,
+ stateless_rpc=False, advertise_refs=False):
+ Handler.__init__(self, backend, read, write)
+ self.stateless_rpc = stateless_rpc
+ self.advertise_refs = advertise_refs
+
+ def __init__(self, backend, read, write,
+ stateless_rpc=False, advertise_refs=False):
+ Handler.__init__(self, backend, read, write)
+ self._stateless_rpc = stateless_rpc
+ self._advertise_refs = advertise_refs
+
+ def default_capabilities(self):
+ return ("report-status", "delete-refs")
def handle(self):
- refs = self.backend.get_refs()
+ refs = self.backend.get_refs().items()
- if refs:
- self.write_pkt_line("%s %s\x00multi_ack side-band-64k thin-pack ofs-delta\n" % (refs[0][1], refs[0][0]))
- for i in range(1, len(refs)):
- ref = refs[i]
- self.write_pkt_line("%s %s\n" % (ref[1], ref[0]))
+ if self.advertise_refs or not self.stateless_rpc:
+ if refs:
+ self.proto.write_pkt_line("%s %s\x00%s\n" % (refs[0][1], refs[0][0], self.capabilities()))
+ for i in range(1, len(refs)):
+ ref = refs[i]
+ self.proto.write_pkt_line("%s %s\n" % (ref[1], ref[0]))
+ else:
+ self.proto.write_pkt_line("0000000000000000000000000000000000000000 capabilities^{} %s" % self.capabilities())
- self.write("0000")
+ self.proto.write("0000")
+ if self.advertise_refs:
+ return
client_refs = []
- ref = self.read_pkt_line()
+ ref = self.proto.read_pkt_line()
+
+ # if ref is none then client doesnt want to send us anything..
+ if ref is None:
+ return
+
+ ref, client_capabilities = extract_capabilities(ref)
+
+ # client will now send us a list of (oldsha, newsha, ref)
while ref:
client_refs.append(ref.split())
- ref = self.read_pkt_line()
-
- # client might hang up without sending us any refs
- if len(client_refs) == 0:
- return None
+ ref = self.proto.read_pkt_line()
# backend can now deal with this refs and read a pack using self.read
- self.backend.apply_pack(client_refs, self.read)
+ status = self.backend.apply_pack(client_refs, self.proto.read)
+ # when we have read all the pack from the client, send a status report
+ # if the client asked for it
+ if 'report-status' in client_capabilities:
+ for name, msg in status:
+ if name == 'unpack':
+ self.proto.write_pkt_line('unpack %s\n' % msg)
+ elif msg == 'ok':
+ self.proto.write_pkt_line('ok %s\n' % name)
+ else:
+ self.proto.write_pkt_line('ng %s %s\n' % (name, msg))
+ self.proto.write_pkt_line(None)
-class TCPGitRequestHandler(SocketServer.StreamRequestHandler, Handler):
- def __init__(self, request, client_address, server):
- SocketServer.StreamRequestHandler.__init__(self, request, client_address, server)
+class TCPGitRequestHandler(SocketServer.StreamRequestHandler):
def handle(self):
- #FIXME: StreamRequestHandler seems to be the thing that calls handle(),
- #so we can't call this in a sane place??
- Handler.__init__(self, self.server.backend, self.rfile.read, self.wfile.write)
-
- request = self.read_pkt_line()
-
- # up until the space is the command to run, everything after is parameters
- splice_point = request.find(' ')
- command, params = request[:splice_point], request[splice_point+1:]
-
- # params are null seperated
- params = params.split(chr(0))
+ proto = Protocol(self.rfile.read, self.wfile.write)
+ command, args = proto.read_cmd()
# switch case to handle the specific git command
if command == 'git-upload-pack':
else:
return
- h = cls(self.backend, self.read, self.write)
+ h = cls(self.server.backend, self.rfile.read, self.wfile.write)
h.handle()
allow_reuse_address = True
serve = SocketServer.TCPServer.serve_forever
- def __init__(self, backend, addr):
+ def __init__(self, backend, listen_addr, port=TCP_GIT_PORT):
self.backend = backend
- SocketServer.TCPServer.__init__(self, addr, TCPGitRequestHandler)
-
-
+ SocketServer.TCPServer.__init__(self, (listen_addr, port), TCPGitRequestHandler)