build: fixed the task counter when nothreads is used
[nivanova/samba-autobuild/.git] / buildtools / wafsamba / nothreads.py
1 #!/usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2005-2008 (ita)
4
5 # this replaces the core of Runner.py in waf with a varient that works
6 # on systems with completely broken threading (such as Python 2.5.x on
7 # AIX). For simplicity we enable this when JOBS=1, which is triggered
8 # by the compatibility makefile used for the waf build. That also ensures
9 # this code is tested, as it means it is used in the build farm, and by
10 # anyone using 'make' to build Samba with waf
11
12 "Execute the tasks"
13
14 import sys, random, time, threading, traceback, os
15 try: from Queue import Queue
16 except ImportError: from queue import Queue
17 import Build, Utils, Logs, Options
18 from Logs import debug, error
19 from Constants import *
20
21 GAP = 15
22
23 run_old = threading.Thread.run
24 def run(*args, **kwargs):
25         try:
26                 run_old(*args, **kwargs)
27         except (KeyboardInterrupt, SystemExit):
28                 raise
29         except:
30                 sys.excepthook(*sys.exc_info())
31 threading.Thread.run = run
32
33
34 class TaskConsumer(object):
35         consumers = 1
36
37 def process(tsk):
38         m = tsk.master
39         if m.stop:
40                 m.out.put(tsk)
41                 return
42
43         try:
44                 tsk.generator.bld.printout(tsk.display())
45                 if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
46                 # actual call to task's run() function
47                 else: ret = tsk.call_run()
48         except Exception, e:
49                 tsk.err_msg = Utils.ex_stack()
50                 tsk.hasrun = EXCEPTION
51
52                 # TODO cleanup
53                 m.error_handler(tsk)
54                 m.out.put(tsk)
55                 return
56
57         if ret:
58                 tsk.err_code = ret
59                 tsk.hasrun = CRASHED
60         else:
61                 try:
62                         tsk.post_run()
63                 except Utils.WafError:
64                         pass
65                 except Exception:
66                         tsk.err_msg = Utils.ex_stack()
67                         tsk.hasrun = EXCEPTION
68                 else:
69                         tsk.hasrun = SUCCESS
70         if tsk.hasrun != SUCCESS:
71                 m.error_handler(tsk)
72
73         m.out.put(tsk)
74
75 class Parallel(object):
76         """
77         keep the consumer threads busy, and avoid consuming cpu cycles
78         when no more tasks can be added (end of the build, etc)
79         """
80         def __init__(self, bld, j=2):
81
82                 # number of consumers
83                 self.numjobs = j
84
85                 self.manager = bld.task_manager
86                 self.manager.current_group = 0
87
88                 self.total = self.manager.total()
89
90                 # tasks waiting to be processed - IMPORTANT
91                 self.outstanding = []
92                 self.maxjobs = MAXJOBS
93
94                 # tasks that are awaiting for another task to complete
95                 self.frozen = []
96
97                 # tasks returned by the consumers
98                 self.out = Queue(0)
99
100                 self.count = 0 # tasks not in the producer area
101
102                 self.processed = 1 # progress indicator
103
104                 self.stop = False # error condition to stop the build
105                 self.error = False # error flag
106
107         def get_next(self):
108                 "override this method to schedule the tasks in a particular order"
109                 if not self.outstanding:
110                         return None
111                 return self.outstanding.pop(0)
112
113         def postpone(self, tsk):
114                 "override this method to schedule the tasks in a particular order"
115                 # TODO consider using a deque instead
116                 if random.randint(0, 1):
117                         self.frozen.insert(0, tsk)
118                 else:
119                         self.frozen.append(tsk)
120
121         def refill_task_list(self):
122                 "called to set the next group of tasks"
123
124                 while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
125                         self.get_out()
126
127                 while not self.outstanding:
128                         if self.count:
129                                 self.get_out()
130
131                         if self.frozen:
132                                 self.outstanding += self.frozen
133                                 self.frozen = []
134                         elif not self.count:
135                                 (jobs, tmp) = self.manager.get_next_set()
136                                 if jobs != None: self.maxjobs = jobs
137                                 if tmp: self.outstanding += tmp
138                                 break
139
140         def get_out(self):
141                 "the tasks that are put to execute are all collected using get_out"
142                 ret = self.out.get()
143                 self.manager.add_finished(ret)
144                 if not self.stop and getattr(ret, 'more_tasks', None):
145                         self.outstanding += ret.more_tasks
146                         self.total += len(ret.more_tasks)
147                 self.count -= 1
148
149         def error_handler(self, tsk):
150                 "by default, errors make the build stop (not thread safe so be careful)"
151                 if not Options.options.keep:
152                         self.stop = True
153                 self.error = True
154
155         def start(self):
156                 "execute the tasks"
157
158                 while not self.stop:
159
160                         self.refill_task_list()
161
162                         # consider the next task
163                         tsk = self.get_next()
164                         if not tsk:
165                                 if self.count:
166                                         # tasks may add new ones after they are run
167                                         continue
168                                 else:
169                                         # no tasks to run, no tasks running, time to exit
170                                         break
171
172                         if tsk.hasrun:
173                                 # if the task is marked as "run", just skip it
174                                 self.processed += 1
175                                 self.manager.add_finished(tsk)
176                                 continue
177
178                         try:
179                                 st = tsk.runnable_status()
180                         except Exception, e:
181                                 self.processed += 1
182                                 if self.stop and not Options.options.keep:
183                                         tsk.hasrun = SKIPPED
184                                         self.manager.add_finished(tsk)
185                                         continue
186                                 self.error_handler(tsk)
187                                 self.manager.add_finished(tsk)
188                                 tsk.hasrun = EXCEPTION
189                                 tsk.err_msg = Utils.ex_stack()
190                                 continue
191
192                         if st == ASK_LATER:
193                                 self.postpone(tsk)
194                         elif st == SKIP_ME:
195                                 self.processed += 1
196                                 tsk.hasrun = SKIPPED
197                                 self.manager.add_finished(tsk)
198                         else:
199                                 # run me: put the task in ready queue
200                                 tsk.position = (self.processed, self.total)
201                                 self.count += 1
202                                 self.processed += 1
203                                 tsk.master = self
204
205                                 process(tsk)
206
207                 # self.count represents the tasks that have been made available to the consumer threads
208                 # collect all the tasks after an error else the message may be incomplete
209                 while self.error and self.count:
210                         self.get_out()
211
212                 #print loop
213                 assert (self.count == 0 or self.stop)
214
215
216 # enable nothreads if -j1 is used from the makefile
217 if os.environ.get('JOBS') == '1':
218         import Runner
219         Runner.process = process
220         Runner.Parallel = Parallel