3 # Thomas Nagy, 2005-2008 (ita)
7 import os, sys, random, time, threading, traceback
8 try: from Queue import Queue
9 except ImportError: from queue import Queue
10 import Build, Utils, Logs, Options
11 from Logs import debug, error
12 from Constants import *
16 run_old = threading.Thread.run
17 def run(*args, **kwargs):
19 run_old(*args, **kwargs)
20 except (KeyboardInterrupt, SystemExit):
23 sys.excepthook(*sys.exc_info())
24 threading.Thread.run = run
26 def process_task(tsk):
34 tsk.generator.bld.printout(tsk.display())
35 if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
36 # actual call to task's run() function
37 else: ret = tsk.call_run()
39 tsk.err_msg = Utils.ex_stack()
40 tsk.hasrun = EXCEPTION
53 except Utils.WafError:
56 tsk.err_msg = Utils.ex_stack()
57 tsk.hasrun = EXCEPTION
60 if tsk.hasrun != SUCCESS:
65 class TaskConsumer(threading.Thread):
70 threading.Thread.__init__(self)
82 tsk = TaskConsumer.ready.get()
85 class Parallel(object):
87 keep the consumer threads busy, and avoid consuming cpu cycles
88 when no more tasks can be added (end of the build, etc)
90 def __init__(self, bld, j=2):
95 self.manager = bld.task_manager
96 self.manager.current_group = 0
98 self.total = self.manager.total()
100 # tasks waiting to be processed - IMPORTANT
101 self.outstanding = []
102 self.maxjobs = MAXJOBS
104 # tasks that are awaiting for another task to complete
107 # tasks returned by the consumers
110 self.count = 0 # tasks not in the producer area
112 self.processed = 1 # progress indicator
114 self.stop = False # error condition to stop the build
115 self.error = False # error flag
118 "override this method to schedule the tasks in a particular order"
119 if not self.outstanding:
121 return self.outstanding.pop(0)
123 def postpone(self, tsk):
124 "override this method to schedule the tasks in a particular order"
125 # TODO consider using a deque instead
126 if random.randint(0, 1):
127 self.frozen.insert(0, tsk)
129 self.frozen.append(tsk)
131 def refill_task_list(self):
132 "called to set the next group of tasks"
134 while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
137 while not self.outstanding:
142 self.outstanding += self.frozen
145 (jobs, tmp) = self.manager.get_next_set()
146 if jobs != None: self.maxjobs = jobs
147 if tmp: self.outstanding += tmp
151 "the tasks that are put to execute are all collected using get_out"
153 self.manager.add_finished(ret)
154 if not self.stop and getattr(ret, 'more_tasks', None):
155 self.outstanding += ret.more_tasks
156 self.total += len(ret.more_tasks)
159 def error_handler(self, tsk):
160 "by default, errors make the build stop (not thread safe so be careful)"
161 if not Options.options.keep:
168 if TaskConsumer.consumers:
169 # the worker pool is usually loaded lazily (see below)
170 # in case it is re-used with a different value of numjobs:
171 while len(TaskConsumer.consumers) < self.numjobs:
172 TaskConsumer.consumers.append(TaskConsumer())
176 self.refill_task_list()
178 # consider the next task
179 tsk = self.get_next()
182 # tasks may add new ones after they are run
185 # no tasks to run, no tasks running, time to exit
189 # if the task is marked as "run", just skip it
191 self.manager.add_finished(tsk)
195 st = tsk.runnable_status()
198 if self.stop and not Options.options.keep:
200 self.manager.add_finished(tsk)
202 self.error_handler(tsk)
203 self.manager.add_finished(tsk)
204 tsk.hasrun = EXCEPTION
205 tsk.err_msg = Utils.ex_stack()
213 self.manager.add_finished(tsk)
215 # run me: put the task in ready queue
216 tsk.position = (self.processed, self.total)
221 if self.numjobs == 1:
224 TaskConsumer.ready.put(tsk)
225 # create the consumer threads only if there is something to consume
226 if not TaskConsumer.consumers:
227 TaskConsumer.consumers = [TaskConsumer() for i in xrange(self.numjobs)]
229 # self.count represents the tasks that have been made available to the consumer threads
230 # collect all the tasks after an error else the message may be incomplete
231 while self.error and self.count:
235 assert (self.count == 0 or self.stop)