"""Tests for the smart protocol server."""
-from cStringIO import StringIO
from unittest import TestCase
from dulwich.errors import (
GitProtocolError,
)
from dulwich.server import (
- UploadPackHandler,
+ Backend,
+ DictBackend,
+ BackendRepo,
Handler,
- ProtocolGraphWalker,
- SingleAckGraphWalkerImpl,
MultiAckGraphWalkerImpl,
MultiAckDetailedGraphWalkerImpl,
+ ProtocolGraphWalker,
+ SingleAckGraphWalkerImpl,
+ UploadPackHandler,
)
THREE = '3' * 40
FOUR = '4' * 40
FIVE = '5' * 40
+SIX = '6' * 40
class TestProto(object):
class HandlerTestCase(TestCase):
def setUp(self):
- self._handler = Handler(None, None, None)
+ self._handler = Handler(Backend(), None, None)
self._handler.capabilities = lambda: ('cap1', 'cap2', 'cap3')
+ self._handler.required_capabilities = lambda: ('cap2',)
def assertSucceeds(self, func, *args, **kwargs):
try:
func(*args, **kwargs)
- except GitProtocolError:
- self.fail()
+ except GitProtocolError, e:
+ self.fail(e)
def test_capability_line(self):
self.assertEquals('cap1 cap2 cap3', self._handler.capability_line())
def test_set_client_capabilities(self):
set_caps = self._handler.set_client_capabilities
- self.assertSucceeds(set_caps, [])
self.assertSucceeds(set_caps, ['cap2'])
self.assertSucceeds(set_caps, ['cap1', 'cap2'])
+
# different order
self.assertSucceeds(set_caps, ['cap3', 'cap1', 'cap2'])
- self.assertRaises(GitProtocolError, set_caps, ['capxxx', 'cap1'])
+
+ # error cases
+ self.assertRaises(GitProtocolError, set_caps, ['capxxx', 'cap2'])
+ self.assertRaises(GitProtocolError, set_caps, ['cap1', 'cap3'])
+
+ # ignore innocuous but unknown capabilities
+ self.assertRaises(GitProtocolError, set_caps, ['cap2', 'ignoreme'])
+ self.assertFalse('ignoreme' in self._handler.capabilities())
+ self._handler.innocuous_capabilities = lambda: ('ignoreme',)
+ self.assertSucceeds(set_caps, ['cap2', 'ignoreme'])
def test_has_capability(self):
self.assertRaises(GitProtocolError, self._handler.has_capability, 'cap')
class UploadPackHandlerTestCase(TestCase):
def setUp(self):
- self._handler = UploadPackHandler(None, None, None)
+ self._backend = DictBackend({"/": BackendRepo()})
+ self._handler = UploadPackHandler(self._backend,
+ ["/", "host=lolcathost"], None, None)
self._handler.proto = TestProto()
def test_progress(self):
- self._handler.set_client_capabilities([])
+ caps = self._handler.required_capabilities()
+ self._handler.set_client_capabilities(caps)
self._handler.progress('first message')
self._handler.progress('second message')
self.assertEqual('first message',
self.assertEqual(None, self._handler.proto.get_received_line(2))
def test_no_progress(self):
- self._handler.set_client_capabilities(['no-progress'])
+ caps = list(self._handler.required_capabilities()) + ['no-progress']
+ self._handler.set_client_capabilities(caps)
self._handler.progress('first message')
self._handler.progress('second message')
self.assertEqual(None, self._handler.proto.get_received_line(2))
+ def test_get_tagged(self):
+ refs = {
+ 'refs/tags/tag1': ONE,
+ 'refs/tags/tag2': TWO,
+ 'refs/heads/master': FOUR, # not a tag, no peeled value
+ }
+ peeled = {
+ 'refs/tags/tag1': '1234',
+ 'refs/tags/tag2': '5678',
+ }
+
+ class TestRepo(object):
+ def get_peeled(self, ref):
+ return peeled.get(ref, refs[ref])
+
+ caps = list(self._handler.required_capabilities()) + ['include-tag']
+ self._handler.set_client_capabilities(caps)
+ self.assertEquals({'1234': ONE, '5678': TWO},
+ self._handler.get_tagged(refs, repo=TestRepo()))
+
+ # non-include-tag case
+ caps = self._handler.required_capabilities()
+ self._handler.set_client_capabilities(caps)
+ self.assertEquals({}, self._handler.get_tagged(refs, repo=TestRepo()))
+
class TestCommit(object):
def __init__(self, sha, parents, commit_time):
self.id = sha
- self._parents = parents
+ self.parents = parents
self.commit_time = commit_time
-
- def get_parents(self):
- return self._parents
+ self.type_name = "commit"
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self._sha)
+class TestRepo(object):
+ def __init__(self):
+ self.peeled = {}
+
+ def get_peeled(self, name):
+ return self.peeled[name]
+
+
class TestBackend(object):
- def __init__(self, objects):
+ def __init__(self, repo, objects):
+ self.repo = repo
self.object_store = objects
class TestUploadPackHandler(Handler):
def __init__(self, objects, proto):
- self.backend = TestBackend(objects)
+ self.backend = TestBackend(TestRepo(), objects)
self.proto = proto
self.stateless_rpc = False
self.advertise_refs = False
FOUR: TestCommit(FOUR, [TWO], 444),
FIVE: TestCommit(FIVE, [THREE], 555),
}
+
self._walker = ProtocolGraphWalker(
- TestUploadPackHandler(self._objects, TestProto()))
+ TestUploadPackHandler(self._objects, TestProto()),
+ self._objects, None)
def test_is_satisfied_no_haves(self):
self.assertFalse(self._walker._is_satisfied([], ONE, 0))
'want %s' % TWO,
])
heads = {'ref1': ONE, 'ref2': TWO, 'ref3': THREE}
+ self._walker.get_peeled = heads.get
self.assertEquals([ONE, TWO], self._walker.determine_wants(heads))
self._walker.proto.set_output(['want %s multi_ack' % FOUR])
self._walker.proto.set_output(['want %s multi_ack' % FOUR])
self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
+ def test_determine_wants_advertisement(self):
+ self._walker.proto.set_output([])
+ # advertise branch tips plus tag
+ heads = {'ref4': FOUR, 'ref5': FIVE, 'tag6': SIX}
+ peeled = {'ref4': FOUR, 'ref5': FIVE, 'tag6': FIVE}
+ self._walker.get_peeled = peeled.get
+ self._walker.determine_wants(heads)
+ lines = []
+ while True:
+ line = self._walker.proto.get_received_line()
+ if line == 'None':
+ break
+ # strip capabilities list if present
+ if '\x00' in line:
+ line = line[:line.index('\x00')]
+ lines.append(line.rstrip())
+
+ self.assertEquals([
+ '%s ref4' % FOUR,
+ '%s ref5' % FIVE,
+ '%s tag6^{}' % FIVE,
+ '%s tag6' % SIX,
+ ], sorted(lines))
+
+ # ensure peeled tag was advertised immediately following tag
+ for i, line in enumerate(lines):
+ if line.endswith(' tag6'):
+ self.assertEquals('%s tag6^{}' % FIVE, lines[i+1])
+
# TODO: test commit time cutoff