wafsamba: Expand tabs.
[samba.git] / buildtools / wafsamba / nothreads.py
1 #!/usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2005-2008 (ita)
4
5 # this replaces the core of Runner.py in waf with a varient that works
6 # on systems with completely broken threading (such as Python 2.5.x on
7 # AIX). For simplicity we enable this when JOBS=1, which is triggered
8 # by the compatibility makefile used for the waf build. That also ensures
9 # this code is tested, as it means it is used in the build farm, and by
10 # anyone using 'make' to build Samba with waf
11
12 "Execute the tasks"
13
14 import sys, random, time, threading, traceback, os
15 try: from Queue import Queue
16 except ImportError: from queue import Queue
17 import Build, Utils, Logs, Options
18 from Logs import debug, error
19 from Constants import *
20
21 GAP = 15
22
23 run_old = threading.Thread.run
24 def run(*args, **kwargs):
25     try:
26         run_old(*args, **kwargs)
27     except (KeyboardInterrupt, SystemExit):
28         raise
29     except:
30         sys.excepthook(*sys.exc_info())
31 threading.Thread.run = run
32
33
34 class TaskConsumer(object):
35     consumers = 1
36
37 def process(tsk):
38     m = tsk.master
39     if m.stop:
40         m.out.put(tsk)
41         return
42
43     try:
44         tsk.generator.bld.printout(tsk.display())
45         if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
46         # actual call to task's run() function
47         else: ret = tsk.call_run()
48     except Exception, e:
49         tsk.err_msg = Utils.ex_stack()
50         tsk.hasrun = EXCEPTION
51
52         # TODO cleanup
53         m.error_handler(tsk)
54         m.out.put(tsk)
55         return
56
57     if ret:
58         tsk.err_code = ret
59         tsk.hasrun = CRASHED
60     else:
61         try:
62             tsk.post_run()
63         except Utils.WafError:
64             pass
65         except Exception:
66             tsk.err_msg = Utils.ex_stack()
67             tsk.hasrun = EXCEPTION
68         else:
69             tsk.hasrun = SUCCESS
70     if tsk.hasrun != SUCCESS:
71         m.error_handler(tsk)
72
73     m.out.put(tsk)
74
75 class Parallel(object):
76     """
77     keep the consumer threads busy, and avoid consuming cpu cycles
78     when no more tasks can be added (end of the build, etc)
79     """
80     def __init__(self, bld, j=2):
81
82         # number of consumers
83         self.numjobs = j
84
85         self.manager = bld.task_manager
86         self.manager.current_group = 0
87
88         self.total = self.manager.total()
89
90         # tasks waiting to be processed - IMPORTANT
91         self.outstanding = []
92         self.maxjobs = MAXJOBS
93
94         # tasks that are awaiting for another task to complete
95         self.frozen = []
96
97         # tasks returned by the consumers
98         self.out = Queue(0)
99
100         self.count = 0 # tasks not in the producer area
101
102         self.processed = 1 # progress indicator
103
104         self.stop = False # error condition to stop the build
105         self.error = False # error flag
106
107     def get_next(self):
108         "override this method to schedule the tasks in a particular order"
109         if not self.outstanding:
110             return None
111         return self.outstanding.pop(0)
112
113     def postpone(self, tsk):
114         "override this method to schedule the tasks in a particular order"
115         # TODO consider using a deque instead
116         if random.randint(0, 1):
117             self.frozen.insert(0, tsk)
118         else:
119             self.frozen.append(tsk)
120
121     def refill_task_list(self):
122         "called to set the next group of tasks"
123
124         while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
125             self.get_out()
126
127         while not self.outstanding:
128             if self.count:
129                 self.get_out()
130
131             if self.frozen:
132                 self.outstanding += self.frozen
133                 self.frozen = []
134             elif not self.count:
135                 (jobs, tmp) = self.manager.get_next_set()
136                 if jobs != None: self.maxjobs = jobs
137                 if tmp: self.outstanding += tmp
138                 break
139
140     def get_out(self):
141         "the tasks that are put to execute are all collected using get_out"
142         ret = self.out.get()
143         self.manager.add_finished(ret)
144         if not self.stop and getattr(ret, 'more_tasks', None):
145             self.outstanding += ret.more_tasks
146             self.total += len(ret.more_tasks)
147         self.count -= 1
148
149     def error_handler(self, tsk):
150         "by default, errors make the build stop (not thread safe so be careful)"
151         if not Options.options.keep:
152             self.stop = True
153         self.error = True
154
155     def start(self):
156         "execute the tasks"
157
158         while not self.stop:
159
160             self.refill_task_list()
161
162             # consider the next task
163             tsk = self.get_next()
164             if not tsk:
165                 if self.count:
166                     # tasks may add new ones after they are run
167                     continue
168                 else:
169                     # no tasks to run, no tasks running, time to exit
170                     break
171
172             if tsk.hasrun:
173                 # if the task is marked as "run", just skip it
174                 self.processed += 1
175                 self.manager.add_finished(tsk)
176                 continue
177
178             try:
179                 st = tsk.runnable_status()
180             except Exception, e:
181                 self.processed += 1
182                 if self.stop and not Options.options.keep:
183                     tsk.hasrun = SKIPPED
184                     self.manager.add_finished(tsk)
185                     continue
186                 self.error_handler(tsk)
187                 self.manager.add_finished(tsk)
188                 tsk.hasrun = EXCEPTION
189                 tsk.err_msg = Utils.ex_stack()
190                 continue
191
192             if st == ASK_LATER:
193                 self.postpone(tsk)
194             elif st == SKIP_ME:
195                 self.processed += 1
196                 tsk.hasrun = SKIPPED
197                 self.manager.add_finished(tsk)
198             else:
199                 # run me: put the task in ready queue
200                 tsk.position = (self.processed, self.total)
201                 self.count += 1
202                 self.processed += 1
203                 tsk.master = self
204
205                 process(tsk)
206
207         # self.count represents the tasks that have been made available to the consumer threads
208         # collect all the tasks after an error else the message may be incomplete
209         while self.error and self.count:
210             self.get_out()
211
212         #print loop
213         assert (self.count == 0 or self.stop)
214
215
216 # enable nothreads
217 import Runner
218 Runner.process = process
219 Runner.Parallel = Parallel