wafsamba: add support for separate rules in stages
[gd/samba-autobuild/.git] / buildtools / wafadmin / Runner.py
1 #!/usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2005-2008 (ita)
4
5 "Execute the tasks"
6
7 import os, sys, random, time, threading, traceback
8 try: from Queue import Queue
9 except ImportError: from queue import Queue
10 import Build, Utils, Logs, Options
11 from Logs import debug, error
12 from Constants import *
13
14 GAP = 15
15
16 run_old = threading.Thread.run
17 def run(*args, **kwargs):
18         try:
19                 run_old(*args, **kwargs)
20         except (KeyboardInterrupt, SystemExit):
21                 raise
22         except:
23                 sys.excepthook(*sys.exc_info())
24 threading.Thread.run = run
25
26 def process_task(tsk):
27
28         m = tsk.master
29         if m.stop:
30                 m.out.put(tsk)
31                 return
32
33         try:
34                 tsk.generator.bld.printout(tsk.display())
35                 if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
36                 # actual call to task's run() function
37                 else: ret = tsk.call_run()
38         except Exception, e:
39                 tsk.err_msg = Utils.ex_stack()
40                 tsk.hasrun = EXCEPTION
41
42                 # TODO cleanup
43                 m.error_handler(tsk)
44                 m.out.put(tsk)
45                 return
46
47         if ret:
48                 tsk.err_code = ret
49                 tsk.hasrun = CRASHED
50         else:
51                 try:
52                         tsk.post_run()
53                 except Utils.WafError:
54                         pass
55                 except Exception:
56                         tsk.err_msg = Utils.ex_stack()
57                         tsk.hasrun = EXCEPTION
58                 else:
59                         tsk.hasrun = SUCCESS
60         if tsk.hasrun != SUCCESS:
61                 m.error_handler(tsk)
62
63         m.out.put(tsk)
64
65 class TaskConsumer(threading.Thread):
66         ready = Queue(0)
67         consumers = []
68
69         def __init__(self):
70                 threading.Thread.__init__(self)
71                 self.setDaemon(1)
72                 self.start()
73
74         def run(self):
75                 try:
76                         self.loop()
77                 except:
78                         pass
79
80         def loop(self):
81                 while 1:
82                         tsk = TaskConsumer.ready.get()
83                         process_task(tsk)
84
85 class Parallel(object):
86         """
87         keep the consumer threads busy, and avoid consuming cpu cycles
88         when no more tasks can be added (end of the build, etc)
89         """
90         def __init__(self, bld, j=2):
91
92                 # number of consumers
93                 self.numjobs = j
94
95                 self.manager = bld.task_manager
96                 self.manager.current_group = 0
97
98                 self.total = self.manager.total()
99
100                 # tasks waiting to be processed - IMPORTANT
101                 self.outstanding = []
102                 self.maxjobs = MAXJOBS
103
104                 # tasks that are awaiting for another task to complete
105                 self.frozen = []
106
107                 # tasks returned by the consumers
108                 self.out = Queue(0)
109
110                 self.count = 0 # tasks not in the producer area
111
112                 self.processed = 1 # progress indicator
113
114                 self.stop = False # error condition to stop the build
115                 self.error = False # error flag
116
117         def get_next(self):
118                 "override this method to schedule the tasks in a particular order"
119                 if not self.outstanding:
120                         return None
121                 return self.outstanding.pop(0)
122
123         def postpone(self, tsk):
124                 "override this method to schedule the tasks in a particular order"
125                 # TODO consider using a deque instead
126                 if random.randint(0, 1):
127                         self.frozen.insert(0, tsk)
128                 else:
129                         self.frozen.append(tsk)
130
131         def refill_task_list(self):
132                 "called to set the next group of tasks"
133
134                 while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
135                         self.get_out()
136
137                 while not self.outstanding:
138                         if self.count:
139                                 self.get_out()
140
141                         if self.frozen:
142                                 self.outstanding += self.frozen
143                                 self.frozen = []
144                         elif not self.count:
145                                 (jobs, tmp) = self.manager.get_next_set()
146                                 if jobs != None: self.maxjobs = jobs
147                                 if tmp: self.outstanding += tmp
148                                 break
149
150         def get_out(self):
151                 "the tasks that are put to execute are all collected using get_out"
152                 ret = self.out.get()
153                 self.manager.add_finished(ret)
154                 if not self.stop and getattr(ret, 'more_tasks', None):
155                         self.outstanding += ret.more_tasks
156                         self.total += len(ret.more_tasks)
157                 self.count -= 1
158
159         def error_handler(self, tsk):
160                 "by default, errors make the build stop (not thread safe so be careful)"
161                 if not Options.options.keep:
162                         self.stop = True
163                 self.error = True
164
165         def start(self):
166                 "execute the tasks"
167
168                 if TaskConsumer.consumers:
169                         # the worker pool is usually loaded lazily (see below)
170                         # in case it is re-used with a different value of numjobs:
171                         while len(TaskConsumer.consumers) < self.numjobs:
172                                 TaskConsumer.consumers.append(TaskConsumer())
173
174                 while not self.stop:
175
176                         self.refill_task_list()
177
178                         # consider the next task
179                         tsk = self.get_next()
180                         if not tsk:
181                                 if self.count:
182                                         # tasks may add new ones after they are run
183                                         continue
184                                 else:
185                                         # no tasks to run, no tasks running, time to exit
186                                         break
187
188                         if tsk.hasrun:
189                                 # if the task is marked as "run", just skip it
190                                 self.processed += 1
191                                 self.manager.add_finished(tsk)
192                                 continue
193
194                         try:
195                                 st = tsk.runnable_status()
196                         except Exception, e:
197                                 self.processed += 1
198                                 if self.stop and not Options.options.keep:
199                                         tsk.hasrun = SKIPPED
200                                         self.manager.add_finished(tsk)
201                                         continue
202                                 self.error_handler(tsk)
203                                 self.manager.add_finished(tsk)
204                                 tsk.hasrun = EXCEPTION
205                                 tsk.err_msg = Utils.ex_stack()
206                                 continue
207
208                         if st == ASK_LATER:
209                                 self.postpone(tsk)
210                         elif st == SKIP_ME:
211                                 self.processed += 1
212                                 tsk.hasrun = SKIPPED
213                                 self.manager.add_finished(tsk)
214                         else:
215                                 # run me: put the task in ready queue
216                                 tsk.position = (self.processed, self.total)
217                                 self.count += 1
218                                 tsk.master = self
219                                 self.processed += 1
220
221                                 if self.numjobs == 1:
222                                         process_task(tsk)
223                                 else:
224                                         TaskConsumer.ready.put(tsk)
225                                         # create the consumer threads only if there is something to consume
226                                         if not TaskConsumer.consumers:
227                                                 TaskConsumer.consumers = [TaskConsumer() for i in xrange(self.numjobs)]
228
229                 # self.count represents the tasks that have been made available to the consumer threads
230                 # collect all the tasks after an error else the message may be incomplete
231                 while self.error and self.count:
232                         self.get_out()
233
234                 #print loop
235                 assert (self.count == 0 or self.stop)
236