1 # test_server.py -- Tests for the git server
2 # Copyright (C) 2010 Google, Inc.
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; version 2
7 # or (at your option) any later version of the License.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
20 """Tests for the smart protocol server."""
23 from cStringIO import StringIO
24 from unittest import TestCase
26 from dulwich.errors import (
29 from dulwich.server import (
32 SingleAckGraphWalkerImpl,
33 MultiAckGraphWalkerImpl,
34 MultiAckDetailedGraphWalkerImpl,
37 from dulwich.protocol import (
48 class TestProto(object):
51 self._received = {0: [], 1: [], 2: [], 3: []}
53 def set_output(self, output_lines):
54 self._output = ['%s\n' % line.rstrip() for line in output_lines]
56 def read_pkt_line(self):
58 return self._output.pop(0)
62 def write_sideband(self, band, data):
63 self._received[band].append(data)
65 def write_pkt_line(self, data):
68 self._received[0].append(data)
70 def get_received_line(self, band=0):
71 lines = self._received[band]
78 class UploadPackHandlerTestCase(TestCase):
80 self._handler = UploadPackHandler(None, None, None)
82 def test_set_client_capabilities(self):
84 self._handler.set_client_capabilities([])
85 except GitProtocolError:
89 self._handler.set_client_capabilities([
90 'multi_ack', 'side-band-64k', 'thin-pack', 'ofs-delta'])
91 except GitProtocolError:
94 def test_set_client_capabilities_error(self):
95 self.assertRaises(GitProtocolError,
96 self._handler.set_client_capabilities,
97 ['weird_ack_level', 'ofs-delta'])
99 self._handler.set_client_capabilities(['include-tag'])
100 except GitProtocolError:
104 class TestCommit(object):
105 def __init__(self, sha, parents, commit_time):
107 self._parents = parents
108 self.commit_time = commit_time
110 def get_parents(self):
114 return '%s(%s)' % (self.__class__.__name__, self._sha)
117 class TestBackend(object):
118 def __init__(self, objects):
119 self.object_store = objects
122 class TestHandler(object):
123 def __init__(self, objects, proto):
124 self.backend = TestBackend(objects)
126 self.stateless_rpc = False
127 self.advertise_refs = False
129 def capabilities(self):
133 class ProtocolGraphWalkerTestCase(TestCase):
135 # Create the following commit tree:
140 ONE: TestCommit(ONE, [], 111),
141 TWO: TestCommit(TWO, [ONE], 222),
142 THREE: TestCommit(THREE, [ONE], 333),
143 FOUR: TestCommit(FOUR, [TWO], 444),
144 FIVE: TestCommit(FIVE, [THREE], 555),
146 self._walker = ProtocolGraphWalker(
147 TestHandler(self._objects, TestProto()))
149 def test_is_satisfied_no_haves(self):
150 self.assertFalse(self._walker._is_satisfied([], ONE, 0))
151 self.assertFalse(self._walker._is_satisfied([], TWO, 0))
152 self.assertFalse(self._walker._is_satisfied([], THREE, 0))
154 def test_is_satisfied_have_root(self):
155 self.assertTrue(self._walker._is_satisfied([ONE], ONE, 0))
156 self.assertTrue(self._walker._is_satisfied([ONE], TWO, 0))
157 self.assertTrue(self._walker._is_satisfied([ONE], THREE, 0))
159 def test_is_satisfied_have_branch(self):
160 self.assertTrue(self._walker._is_satisfied([TWO], TWO, 0))
162 self.assertFalse(self._walker._is_satisfied([TWO], THREE, 0))
164 def test_all_wants_satisfied(self):
165 self._walker.set_wants([FOUR, FIVE])
166 # trivial case: wants == haves
167 self.assertTrue(self._walker.all_wants_satisfied([FOUR, FIVE]))
168 # cases that require walking the commit tree
169 self.assertTrue(self._walker.all_wants_satisfied([ONE]))
170 self.assertFalse(self._walker.all_wants_satisfied([TWO]))
171 self.assertFalse(self._walker.all_wants_satisfied([THREE]))
172 self.assertTrue(self._walker.all_wants_satisfied([TWO, THREE]))
174 def test_read_proto_line(self):
175 self._walker.proto.set_output([
183 self.assertEquals(('want', ONE), self._walker.read_proto_line())
184 self.assertEquals(('want', TWO), self._walker.read_proto_line())
185 self.assertEquals(('have', THREE), self._walker.read_proto_line())
186 self.assertRaises(GitProtocolError, self._walker.read_proto_line)
187 self.assertRaises(GitProtocolError, self._walker.read_proto_line)
188 self.assertEquals(('done', None), self._walker.read_proto_line())
189 self.assertEquals((None, None), self._walker.read_proto_line())
191 def test_determine_wants(self):
192 self.assertRaises(GitProtocolError, self._walker.determine_wants, {})
194 self._walker.proto.set_output([
195 'want %s multi_ack' % ONE,
198 heads = {'ref1': ONE, 'ref2': TWO, 'ref3': THREE}
199 self.assertEquals([ONE, TWO], self._walker.determine_wants(heads))
201 self._walker.proto.set_output(['want %s multi_ack' % FOUR])
202 self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
204 self._walker.proto.set_output([])
205 self.assertEquals([], self._walker.determine_wants(heads))
207 self._walker.proto.set_output(['want %s multi_ack' % ONE, 'foo'])
208 self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
210 self._walker.proto.set_output(['want %s multi_ack' % FOUR])
211 self.assertRaises(GitProtocolError, self._walker.determine_wants, heads)
213 # TODO: test commit time cutoff
216 class TestProtocolGraphWalker(object):
221 self.stateless_rpc = False
222 self.advertise_refs = False
224 def read_proto_line(self):
225 return self.lines.pop(0)
227 def send_ack(self, sha, ack_type=''):
228 self.acks.append((sha, ack_type))
231 self.acks.append((None, 'nak'))
233 def all_wants_satisfied(self, haves):
239 return self.acks.pop(0)
242 class AckGraphWalkerImplTestCase(TestCase):
243 """Base setup and asserts for AckGraphWalker tests."""
245 self._walker = TestProtocolGraphWalker()
246 self._walker.lines = [
252 self._impl = self.impl_cls(self._walker)
254 def assertNoAck(self):
255 self.assertEquals(None, self._walker.pop_ack())
257 def assertAcks(self, acks):
258 for sha, ack_type in acks:
259 self.assertEquals((sha, ack_type), self._walker.pop_ack())
262 def assertAck(self, sha, ack_type=''):
263 self.assertAcks([(sha, ack_type)])
266 self.assertAck(None, 'nak')
268 def assertNextEquals(self, sha):
269 self.assertEquals(sha, self._impl.next())
272 class SingleAckGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
273 impl_cls = SingleAckGraphWalkerImpl
275 def test_single_ack(self):
276 self.assertNextEquals(TWO)
279 self.assertNextEquals(ONE)
280 self._walker.done = True
284 self.assertNextEquals(THREE)
285 self._impl.ack(THREE)
288 self.assertNextEquals(None)
291 def test_single_ack_flush(self):
292 # same as ack test but ends with a flush-pkt instead of done
293 self._walker.lines[-1] = (None, None)
295 self.assertNextEquals(TWO)
298 self.assertNextEquals(ONE)
299 self._walker.done = True
303 self.assertNextEquals(THREE)
306 self.assertNextEquals(None)
309 def test_single_ack_nak(self):
310 self.assertNextEquals(TWO)
313 self.assertNextEquals(ONE)
316 self.assertNextEquals(THREE)
319 self.assertNextEquals(None)
322 def test_single_ack_nak_flush(self):
323 # same as nak test but ends with a flush-pkt instead of done
324 self._walker.lines[-1] = (None, None)
326 self.assertNextEquals(TWO)
329 self.assertNextEquals(ONE)
332 self.assertNextEquals(THREE)
335 self.assertNextEquals(None)
338 class MultiAckGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
339 impl_cls = MultiAckGraphWalkerImpl
341 def test_multi_ack(self):
342 self.assertNextEquals(TWO)
345 self.assertNextEquals(ONE)
346 self._walker.done = True
348 self.assertAck(ONE, 'continue')
350 self.assertNextEquals(THREE)
351 self._impl.ack(THREE)
352 self.assertAck(THREE, 'continue')
354 self.assertNextEquals(None)
355 self.assertAck(THREE)
357 def test_multi_ack_partial(self):
358 self.assertNextEquals(TWO)
361 self.assertNextEquals(ONE)
363 self.assertAck(ONE, 'continue')
365 self.assertNextEquals(THREE)
368 self.assertNextEquals(None)
369 # done, re-send ack of last common
372 def test_multi_ack_flush(self):
373 self._walker.lines = [
380 self.assertNextEquals(TWO)
383 self.assertNextEquals(ONE)
384 self.assertNak() # nak the flush-pkt
386 self._walker.done = True
388 self.assertAck(ONE, 'continue')
390 self.assertNextEquals(THREE)
391 self._impl.ack(THREE)
392 self.assertAck(THREE, 'continue')
394 self.assertNextEquals(None)
395 self.assertAck(THREE)
397 def test_multi_ack_nak(self):
398 self.assertNextEquals(TWO)
401 self.assertNextEquals(ONE)
404 self.assertNextEquals(THREE)
407 self.assertNextEquals(None)
410 class MultiAckDetailedGraphWalkerImplTestCase(AckGraphWalkerImplTestCase):
411 impl_cls = MultiAckDetailedGraphWalkerImpl
413 def test_multi_ack(self):
414 self.assertNextEquals(TWO)
417 self.assertNextEquals(ONE)
418 self._walker.done = True
420 self.assertAcks([(ONE, 'common'), (ONE, 'ready')])
422 self.assertNextEquals(THREE)
423 self._impl.ack(THREE)
424 self.assertAck(THREE, 'ready')
426 self.assertNextEquals(None)
427 self.assertAck(THREE)
429 def test_multi_ack_partial(self):
430 self.assertNextEquals(TWO)
433 self.assertNextEquals(ONE)
435 self.assertAck(ONE, 'common')
437 self.assertNextEquals(THREE)
440 self.assertNextEquals(None)
441 # done, re-send ack of last common
444 def test_multi_ack_flush(self):
445 # same as ack test but contains a flush-pkt in the middle
446 self._walker.lines = [
453 self.assertNextEquals(TWO)
456 self.assertNextEquals(ONE)
457 self.assertNak() # nak the flush-pkt
459 self._walker.done = True
461 self.assertAcks([(ONE, 'common'), (ONE, 'ready')])
463 self.assertNextEquals(THREE)
464 self._impl.ack(THREE)
465 self.assertAck(THREE, 'ready')
467 self.assertNextEquals(None)
468 self.assertAck(THREE)
470 def test_multi_ack_nak(self):
471 self.assertNextEquals(TWO)
474 self.assertNextEquals(ONE)
477 self.assertNextEquals(THREE)
480 self.assertNextEquals(None)
483 def test_multi_ack_nak_flush(self):
484 # same as nak test but contains a flush-pkt in the middle
485 self._walker.lines = [
492 self.assertNextEquals(TWO)
495 self.assertNextEquals(ONE)
498 self.assertNextEquals(THREE)
501 self.assertNextEquals(None)
504 def test_multi_ack_stateless(self):
505 # transmission ends with a flush-pkt
506 self._walker.lines[-1] = (None, None)
507 self._walker.stateless_rpc = True
509 self.assertNextEquals(TWO)
512 self.assertNextEquals(ONE)
515 self.assertNextEquals(THREE)
518 self.assertNextEquals(None)