Remove unnecessary python path updates for bundled subunit/testtools.
[nivanova/samba-autobuild/.git] / lib / subunit / filters / subunit2gtk
1 #!/usr/bin/env python
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #  
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 ### The GTK progress bar __init__ function is derived from the pygtk tutorial:
18 # The PyGTK Tutorial is Copyright (C) 2001-2005 John Finlay.
19
20 # The GTK Tutorial is Copyright (C) 1997 Ian Main.
21
22 # Copyright (C) 1998-1999 Tony Gale.
23
24 # Permission is granted to make and distribute verbatim copies of this manual
25 # provided the copyright notice and this permission notice are preserved on all
26 # copies.
27
28 # Permission is granted to copy and distribute modified versions of this
29 # document under the conditions for verbatim copying, provided that this
30 # copyright notice is included exactly as in the original, and that the entire
31 # resulting derived work is distributed under the terms of a permission notice
32 # identical to this one.
33
34 # Permission is granted to copy and distribute translations of this document
35 # into another language, under the above conditions for modified versions.
36
37 # If you are intending to incorporate this document into a published work,
38 # please contact the maintainer, and we will make an effort to ensure that you
39 # have the most up to date information available.
40
41 # There is no guarantee that this document lives up to its intended purpose.
42 # This is simply provided as a free resource. As such, the authors and
43 # maintainers of the information provided within can not make any guarantee
44 # that the information is even accurate.
45
46 """Display a subunit stream in a gtk progress window."""
47
48 import sys
49 import unittest
50
51 import pygtk
52 pygtk.require('2.0')
53 import gtk, gtk.gdk, gobject
54
55 from subunit import (
56     PROGRESS_POP,
57     PROGRESS_PUSH,
58     PROGRESS_SET,
59     TestProtocolServer,
60     )
61 from subunit.progress_model import  ProgressModel
62
63
64 class GTKTestResult(unittest.TestResult):
65
66     def __init__(self):
67         super(GTKTestResult, self).__init__()
68         # Instance variables (in addition to TestResult)
69         self.window = None
70         self.run_label = None
71         self.ok_label = None
72         self.not_ok_label = None
73         self.total_tests = None
74
75         self.window = gtk.Window(gtk.WINDOW_TOPLEVEL)
76         self.window.set_resizable(True)
77
78         self.window.connect("destroy", gtk.main_quit)
79         self.window.set_title("Tests...")
80         self.window.set_border_width(0)
81
82         vbox = gtk.VBox(False, 5)
83         vbox.set_border_width(10)
84         self.window.add(vbox)
85         vbox.show()
86
87         # Create a centering alignment object
88         align = gtk.Alignment(0.5, 0.5, 0, 0)
89         vbox.pack_start(align, False, False, 5)
90         align.show()
91
92         # Create the ProgressBar
93         self.pbar = gtk.ProgressBar()
94         align.add(self.pbar)
95         self.pbar.set_text("Running")
96         self.pbar.show()
97         self.progress_model = ProgressModel()
98
99         separator = gtk.HSeparator()
100         vbox.pack_start(separator, False, False, 0)
101         separator.show()
102
103         # rows, columns, homogeneous
104         table = gtk.Table(2, 3, False)
105         vbox.pack_start(table, False, True, 0)
106         table.show()
107         # Show summary details about the run. Could use an expander.
108         label = gtk.Label("Run:")
109         table.attach(label, 0, 1, 1, 2, gtk.EXPAND | gtk.FILL,
110             gtk.EXPAND | gtk.FILL, 5, 5)
111         label.show()
112         self.run_label = gtk.Label("N/A")
113         table.attach(self.run_label, 1, 2, 1, 2, gtk.EXPAND | gtk.FILL,
114             gtk.EXPAND | gtk.FILL, 5, 5)
115         self.run_label.show()
116
117         label = gtk.Label("OK:")
118         table.attach(label, 0, 1, 2, 3, gtk.EXPAND | gtk.FILL,
119             gtk.EXPAND | gtk.FILL, 5, 5)
120         label.show()
121         self.ok_label = gtk.Label("N/A")
122         table.attach(self.ok_label, 1, 2, 2, 3, gtk.EXPAND | gtk.FILL,
123             gtk.EXPAND | gtk.FILL, 5, 5)
124         self.ok_label.show()
125
126         label = gtk.Label("Not OK:")
127         table.attach(label, 0, 1, 3, 4, gtk.EXPAND | gtk.FILL,
128             gtk.EXPAND | gtk.FILL, 5, 5)
129         label.show()
130         self.not_ok_label = gtk.Label("N/A")
131         table.attach(self.not_ok_label, 1, 2, 3, 4, gtk.EXPAND | gtk.FILL,
132             gtk.EXPAND | gtk.FILL, 5, 5)
133         self.not_ok_label.show()
134
135         self.window.show()
136         # For the demo.
137         self.window.set_keep_above(True)
138         self.window.present()
139
140     def stopTest(self, test):
141         super(GTKTestResult, self).stopTest(test)
142         self.progress_model.advance()
143         if self.progress_model.width() == 0:
144             self.pbar.pulse()
145         else:
146             pos = self.progress_model.pos()
147             width = self.progress_model.width()
148             percentage = (pos / float(width))
149             self.pbar.set_fraction(percentage)
150
151     def stopTestRun(self):
152         try:
153             super(GTKTestResult, self).stopTestRun()
154         except AttributeError:
155             pass
156         self.pbar.set_text('Finished')
157
158     def addError(self, test, err):
159         super(GTKTestResult, self).addError(test, err)
160         self.update_counts()
161
162     def addFailure(self, test, err):
163         super(GTKTestResult, self).addFailure(test, err)
164         self.update_counts()
165
166     def addSuccess(self, test):
167         super(GTKTestResult, self).addSuccess(test)
168         self.update_counts()
169
170     def addSkip(self, test, reason):
171         # addSkip is new in Python 2.7/3.1
172         addSkip = getattr(super(GTKTestResult, self), 'addSkip', None)
173         if callable(addSkip):
174             addSkip(test, reason)
175         self.update_counts()
176
177     def addExpectedFailure(self, test, err):
178         # addExpectedFailure is new in Python 2.7/3.1
179         addExpectedFailure = getattr(super(GTKTestResult, self),
180             'addExpectedFailure', None)
181         if callable(addExpectedFailure):
182             addExpectedFailure(test, err)
183         self.update_counts()
184
185     def addUnexpectedSuccess(self, test):
186         # addUnexpectedSuccess is new in Python 2.7/3.1
187         addUnexpectedSuccess = getattr(super(GTKTestResult, self),
188             'addUnexpectedSuccess', None)
189         if callable(addUnexpectedSuccess):
190             addUnexpectedSuccess(test)
191         self.update_counts()
192
193     def progress(self, offset, whence):
194         if whence == PROGRESS_PUSH:
195             self.progress_model.push()
196         elif whence == PROGRESS_POP:
197             self.progress_model.pop()
198         elif whence == PROGRESS_SET:
199             self.total_tests = offset
200             self.progress_model.set_width(offset)
201         else:
202             self.total_tests += offset
203             self.progress_model.adjust_width(offset)
204
205     def time(self, a_datetime):
206         # We don't try to estimate completion yet.
207         pass
208
209     def update_counts(self):
210         self.run_label.set_text(str(self.testsRun))
211         bad = len(self.failures + self.errors)
212         self.ok_label.set_text(str(self.testsRun - bad))
213         self.not_ok_label.set_text(str(bad))
214
215
216 class GIOProtocolTestCase(object):
217
218     def __init__(self, stream, result, on_finish):
219         self.stream = stream
220         self.schedule_read()
221         self.hup_id = gobject.io_add_watch(stream, gobject.IO_HUP, self.hup)
222         self.protocol = TestProtocolServer(result)
223         self.on_finish = on_finish
224
225     def read(self, source, condition, all=False):
226         #NB: \o/ actually blocks
227         line = source.readline()
228         if not line:
229             self.protocol.lostConnection()
230             self.on_finish()
231             return False
232         self.protocol.lineReceived(line)
233         # schedule more IO shortly - if we say we're willing to do it
234         # immediately we starve things.
235         if not all:
236             source_id = gobject.timeout_add(1, self.schedule_read)
237             return False
238         else:
239             return True
240
241     def schedule_read(self):
242         self.read_id = gobject.io_add_watch(self.stream, gobject.IO_IN, self.read)
243
244     def hup(self, source, condition):
245         while self.read(source, condition, all=True): pass
246         self.protocol.lostConnection()
247         gobject.source_remove(self.read_id)
248         self.on_finish()
249         return False
250
251
252 result = GTKTestResult()
253 test = GIOProtocolTestCase(sys.stdin, result, result.stopTestRun)
254 gtk.main()
255 if result.wasSuccessful():
256     exit_code = 0
257 else:
258     exit_code = 1
259 sys.exit(exit_code)