1 # test_server.py -- Tests for the git server
2 # Copyright (C) 2010 Google, Inc.
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; version 2
7 # or (at your option) any later version of the License.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 """Tests for the smart protocol server."""
22 from unittest import TestCase
24 from dulwich.errors import (
27 from dulwich.server import (
32 MultiAckGraphWalkerImpl,
33 MultiAckDetailedGraphWalkerImpl,
35 SingleAckGraphWalkerImpl,
48 class TestProto(object):
52 self._received = {0: [], 1: [], 2: [], 3: []}
54 def set_output(self, output_lines):
55 self._output = ['%s\n' % line.rstrip() for line in output_lines]
57 def read_pkt_line(self):
59 return self._output.pop(0)
63 def write_sideband(self, band, data):
64 self._received[band].append(data)
66 def write_pkt_line(self, data):
69 self._received[0].append(data)
71 def get_received_line(self, band=0):
72 lines = self._received[band]
79 class HandlerTestCase(TestCase):
82 self._handler = Handler(Backend(), None)
83 self._handler.capabilities = lambda: ('cap1', 'cap2', 'cap3')
84 self._handler.required_capabilities = lambda: ('cap2',)
86 def assertSucceeds(self, func, *args, **kwargs):
89 except GitProtocolError, e:
92 def test_capability_line(self):
93 self.assertEquals('cap1 cap2 cap3', self._handler.capability_line())
95 def test_set_client_capabilities(self):
96 set_caps = self._handler.set_client_capabilities
97 self.assertSucceeds(set_caps, ['cap2'])
98 self.assertSucceeds(set_caps, ['cap1', 'cap2'])
101 self.assertSucceeds(set_caps, ['cap3', 'cap1', 'cap2'])
104 self.assertRaises(GitProtocolError, set_caps, ['capxxx', 'cap2'])
105 self.assertRaises(GitProtocolError, set_caps, ['cap1', 'cap3'])
107 # ignore innocuous but unknown capabilities
108 self.assertRaises(GitProtocolError, set_caps, ['cap2', 'ignoreme'])
109 self.assertFalse('ignoreme' in self._handler.capabilities())
110 self._handler.innocuous_capabilities = lambda: ('ignoreme',)
111 self.assertSucceeds(set_caps, ['cap2', 'ignoreme'])
113 def test_has_capability(self):
114 self.assertRaises(GitProtocolError, self._handler.has_capability, 'cap')
115 caps = self._handler.capabilities()
116 self._handler.set_client_capabilities(caps)
118 self.assertTrue(self._handler.has_capability(cap))
119 self.assertFalse(self._handler.has_capability('capxxx'))
122 class UploadPackHandlerTestCase(TestCase):
125 self._backend = DictBackend({"/": BackendRepo()})
126 self._handler = UploadPackHandler(self._backend,
127 ["/", "host=lolcathost"], None, None)
128 self._handler.proto = TestProto()
130 def test_progress(self):
131 caps = self._handler.required_capabilities()
132 self._handler.set_client_capabilities(caps)
133 self._handler.progress('first message')
134 self._handler.progress('second message')
135 self.assertEqual('first message',
136 self._handler.proto.get_received_line(2))
137 self.assertEqual('second message',
138 self._handler.proto.get_received_line(2))
139 self.assertEqual(None, self._handler.proto.get_received_line(2))
141 def test_no_progress(self):
142 caps = list(self._handler.required_capabilities()) + ['no-progress']
143 self._handler.set_client_capabilities(caps)
144 self._handler.progress('first message')
145 self._handler.progress('second message')
146 self.assertEqual(None, self._handler.proto.get_received_line(2))
148 def test_get_tagged(self):
150 'refs/tags/tag1': ONE,
151 'refs/tags/tag2': TWO,
152 'refs/heads/master': FOUR, # not a tag, no peeled value
155 'refs/tags/tag1': '1234',
156 'refs/tags/tag2': '5678',
159 class TestRepo(object):
160 def get_peeled(self, ref):
161 return peeled.get(ref, refs[ref])
163 caps = list(self._handler.required_capabilities()) + ['include-tag']
164 self._handler.set_client_capabilities(caps)
165 self.assertEquals({'1234': ONE, '5678': TWO},
166 self._handler.get_tagged(refs, repo=TestRepo()))
168 # non-include-tag case
169 caps = self._handler.required_capabilities()
170 self._handler.set_client_capabilities(caps)
171 self.assertEquals({}, self._handler.get_tagged(refs, repo=TestRepo()))
174 class TestCommit(object):
176 def __init__(self, sha, parents, commit_time):
178 self.parents = parents
179 self.commit_time = commit_time
180 self.type_name = "commit"
183 return '%s(%s)' % (self.__class__.__name__, self._sha)
186 class TestRepo(object):
190 def get_peeled(self, name):
191 return self.peeled[name]
194 class TestBackend(object):
196 def __init__(self, repo, objects):
198 self.object_store = objects
201 class TestUploadPackHandler(Handler):
203 def __init__(self, objects, proto):
204 self.backend = TestBackend(TestRepo(), objects)
206 self.stateless_rpc = False
207 self.advertise_refs = False
209 def capabilities(self):
210 return ('multi_ack',)
213 class ProtocolGraphWalkerTestCase(TestCase):
216 # Create the following commit tree:
221 ONE: TestCommit(ONE, [], 111),
222 TWO: TestCommit(TWO, [ONE], 222),
223 THREE: TestCommit(THREE, [ONE], 333),
224 FOUR: TestCommit(FOUR, [TWO], 444),
225 FIVE: TestCommit(FIVE, [THREE], 555),
228 self._walker = ProtocolGraphWalker(
229 TestUploadPackHandler(self._objects, TestProto()),
232 def test_is_satisfied_no_haves(self):
233 self.assertFalse(self._walker._is_satisfied([], ONE, 0))
234 self.assertFalse(self._walker._is_satisfied([], TWO, 0))
235 self.assertFalse(self._walker._is_satisfied([], THREE, 0))
237 def test_is_satisfied_have_root(self):
238 self.assertTrue(self._walker._is_satisfied([ONE], ONE, 0))
239 self.assertTrue(self._walker._is_satisfied([ONE], TWO, 0))
240 self.assertTrue(self._walker._is_satisfied([ONE], THREE, 0))
242 def test_is_satisfied_have_branch(self):
243 self.assertTrue(self._walker._is_satisfied([TWO], TWO, 0))
245 self.assertFalse(self._walker._is_satisfied([TWO], THREE, 0))
247 def test_all_wants_satisfied(self):
248 self._walker.set_wants([FOUR, FIVE])
249 # trivial case: wants == haves
250 self.assertTrue(self._walker.all_wants_satisfied([FOUR, FIVE]))
251 # cases that require walking the commit tree
252 self.assertTrue(self._walker.all_wants_satisfied([ONE]))
253 self.assertFalse(self._walker.all_wants_satisfied([TWO]))
254 self.assertFalse(self._walker.all_wants_satisfied([THREE]))
255 self.assertTrue(self._walker.all_wants_satisfied([TWO, THREE]))
257 def test_read_proto_line(self):
258 self._walker.proto.set_output([
266 self.assertEquals(('want', ONE), self._walker.read_proto_line())
267 self.assertEquals(('want', TWO), self._walker.read_proto_line())
268 self.assertEquals(('have', THREE), self._walker.read_proto_line())
269 self.assertRaises(GitProtocolError, self._walker.read_proto_line)
270 self.assertRaises(GitProtocolError, self._walker.read_proto_line)
271 self.assertEquals(('done', None), self._walker.read_proto_line())
272 self.assertEquals((None, None), self._walker.read_proto_line())
274 def test_determine_wants(self):
275 self.assertRaises(GitProtocolError, self._walker.determine_wants, {})
277 self._walker.proto.set_output([
278 'want %s multi_ack' % ONE,
281 heads = {'ref1': ONE, 'ref2': TWO, 'ref3': THREE}
282 self._walker.get_peeled = heads.get
283 self.assertEquals([ONE, TWO], self._walker.determine_wants(heads))
285 self._walker.proto.set_output(['want %s multi_ack' % FOUR])
286 self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
288 self._walker.proto.set_output([])
289 self.assertEquals([], self._walker.determine_wants(heads))
291 self._walker.proto.set_output(['want %s multi_ack' % ONE, 'foo'])
292 self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
294 self._walker.proto.set_output(['want %s multi_ack' % FOUR])
295 self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
297 def test_determine_wants_advertisement(self):
298 self._walker.proto.set_output([])
299 # advertise branch tips plus tag
300 heads = {'ref4': FOUR, 'ref5': FIVE, 'tag6': SIX}
301 peeled = {'ref4': FOUR, 'ref5': FIVE, 'tag6': FIVE}
302 self._walker.get_peeled = peeled.get
303 self._walker.determine_wants(heads)
306 line = self._walker.proto.get_received_line()
309 # strip capabilities list if present
311 line = line[:line.index('\x00')]
312 lines.append(line.rstrip())
321 # ensure peeled tag was advertised immediately following tag
322 for i, line in enumerate(lines):
323 if line.endswith(' tag6'):
324 self.assertEquals('%s tag6^{}' % FIVE, lines[i+1])
326 # TODO: test commit time cutoff
329 class TestProtocolGraphWalker(object):
335 self.stateless_rpc = False
336 self.advertise_refs = False
338 def read_proto_line(self):
339 return self.lines.pop(0)
341 def send_ack(self, sha, ack_type=''):
342 self.acks.append((sha, ack_type))
345 self.acks.append((None, 'nak'))
347 def all_wants_satisfied(self, haves):
353 return self.acks.pop(0)
356 class AckGraphWalkerImplTestCase(TestCase):
357 """Base setup and asserts for AckGraphWalker tests."""
360 self._walker = TestProtocolGraphWalker()
361 self._walker.lines = [
367 self._impl = self.impl_cls(self._walker)
369 def assertNoAck(self):
370 self.assertEquals(None, self._walker.pop_ack())
372 def assertAcks(self, acks):
373 for sha, ack_type in acks:
374 self.assertEquals((sha, ack_type), self._walker.pop_ack())
377 def assertAck(self, sha, ack_type=''):
378 self.assertAcks([(sha, ack_type)])
381 self.assertAck(None, 'nak')
383 def assertNextEquals(self, sha):
384 self.assertEquals(sha, self._impl.next())
387 class SingleAckGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
389 impl_cls = SingleAckGraphWalkerImpl
391 def test_single_ack(self):
392 self.assertNextEquals(TWO)
395 self.assertNextEquals(ONE)
396 self._walker.done = True
400 self.assertNextEquals(THREE)
401 self._impl.ack(THREE)
404 self.assertNextEquals(None)
407 def test_single_ack_flush(self):
408 # same as ack test but ends with a flush-pkt instead of done
409 self._walker.lines[-1] = (None, None)
411 self.assertNextEquals(TWO)
414 self.assertNextEquals(ONE)
415 self._walker.done = True
419 self.assertNextEquals(THREE)
422 self.assertNextEquals(None)
425 def test_single_ack_nak(self):
426 self.assertNextEquals(TWO)
429 self.assertNextEquals(ONE)
432 self.assertNextEquals(THREE)
435 self.assertNextEquals(None)
438 def test_single_ack_nak_flush(self):
439 # same as nak test but ends with a flush-pkt instead of done
440 self._walker.lines[-1] = (None, None)
442 self.assertNextEquals(TWO)
445 self.assertNextEquals(ONE)
448 self.assertNextEquals(THREE)
451 self.assertNextEquals(None)
455 class MultiAckGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
457 impl_cls = MultiAckGraphWalkerImpl
459 def test_multi_ack(self):
460 self.assertNextEquals(TWO)
463 self.assertNextEquals(ONE)
464 self._walker.done = True
466 self.assertAck(ONE, 'continue')
468 self.assertNextEquals(THREE)
469 self._impl.ack(THREE)
470 self.assertAck(THREE, 'continue')
472 self.assertNextEquals(None)
473 self.assertAck(THREE)
475 def test_multi_ack_partial(self):
476 self.assertNextEquals(TWO)
479 self.assertNextEquals(ONE)
481 self.assertAck(ONE, 'continue')
483 self.assertNextEquals(THREE)
486 self.assertNextEquals(None)
487 # done, re-send ack of last common
490 def test_multi_ack_flush(self):
491 self._walker.lines = [
498 self.assertNextEquals(TWO)
501 self.assertNextEquals(ONE)
502 self.assertNak() # nak the flush-pkt
504 self._walker.done = True
506 self.assertAck(ONE, 'continue')
508 self.assertNextEquals(THREE)
509 self._impl.ack(THREE)
510 self.assertAck(THREE, 'continue')
512 self.assertNextEquals(None)
513 self.assertAck(THREE)
515 def test_multi_ack_nak(self):
516 self.assertNextEquals(TWO)
519 self.assertNextEquals(ONE)
522 self.assertNextEquals(THREE)
525 self.assertNextEquals(None)
529 class MultiAckDetailedGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
531 impl_cls = MultiAckDetailedGraphWalkerImpl
533 def test_multi_ack(self):
534 self.assertNextEquals(TWO)
537 self.assertNextEquals(ONE)
538 self._walker.done = True
540 self.assertAcks([(ONE, 'common'), (ONE, 'ready')])
542 self.assertNextEquals(THREE)
543 self._impl.ack(THREE)
544 self.assertAck(THREE, 'ready')
546 self.assertNextEquals(None)
547 self.assertAck(THREE)
549 def test_multi_ack_partial(self):
550 self.assertNextEquals(TWO)
553 self.assertNextEquals(ONE)
555 self.assertAck(ONE, 'common')
557 self.assertNextEquals(THREE)
560 self.assertNextEquals(None)
561 # done, re-send ack of last common
564 def test_multi_ack_flush(self):
565 # same as ack test but contains a flush-pkt in the middle
566 self._walker.lines = [
573 self.assertNextEquals(TWO)
576 self.assertNextEquals(ONE)
577 self.assertNak() # nak the flush-pkt
579 self._walker.done = True
581 self.assertAcks([(ONE, 'common'), (ONE, 'ready')])
583 self.assertNextEquals(THREE)
584 self._impl.ack(THREE)
585 self.assertAck(THREE, 'ready')
587 self.assertNextEquals(None)
588 self.assertAck(THREE)
590 def test_multi_ack_nak(self):
591 self.assertNextEquals(TWO)
594 self.assertNextEquals(ONE)
597 self.assertNextEquals(THREE)
600 self.assertNextEquals(None)
603 def test_multi_ack_nak_flush(self):
604 # same as nak test but contains a flush-pkt in the middle
605 self._walker.lines = [
612 self.assertNextEquals(TWO)
615 self.assertNextEquals(ONE)
618 self.assertNextEquals(THREE)
621 self.assertNextEquals(None)
624 def test_multi_ack_stateless(self):
625 # transmission ends with a flush-pkt
626 self._walker.lines[-1] = (None, None)
627 self._walker.stateless_rpc = True
629 self.assertNextEquals(TWO)
632 self.assertNextEquals(ONE)
635 self.assertNextEquals(THREE)
638 self.assertNextEquals(None)