wafsamba: try to fix the build on AIX with xlc_r
[samba.git] / buildtools / wafsamba / nothreads.py
1 # encoding: utf-8
2 # Thomas Nagy, 2005-2008 (ita)
3
4 # this replaces the core of Runner.py in waf with a varient that works
5 # on systems with completely broken threading (such as Python 2.5.x on
6 # AIX). For simplicity we enable this when JOBS=1, which is triggered
7 # by the compatibility makefile used for the waf build. That also ensures
8 # this code is tested, as it means it is used in the build farm, and by
9 # anyone using 'make' to build Samba with waf
10
11 "Execute the tasks"
12
13 import sys, random, time, threading, traceback, os
14 try: from Queue import Queue
15 except ImportError: from queue import Queue
16 import Build, Utils, Logs, Options
17 from Logs import debug, error
18 from Constants import *
19
20 GAP = 15
21
22 run_old = threading.Thread.run
23 def run(*args, **kwargs):
24     try:
25         run_old(*args, **kwargs)
26     except (KeyboardInterrupt, SystemExit):
27         raise
28     except:
29         sys.excepthook(*sys.exc_info())
30 threading.Thread.run = run
31
32
33 class TaskConsumer(object):
34     consumers = 1
35
36 def process(tsk):
37     m = tsk.master
38     if m.stop:
39         m.out.put(tsk)
40         return
41
42     try:
43         tsk.generator.bld.printout(tsk.display())
44         if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
45         # actual call to task's run() function
46         else: ret = tsk.call_run()
47     except Exception, e:
48         tsk.err_msg = Utils.ex_stack()
49         tsk.hasrun = EXCEPTION
50
51         # TODO cleanup
52         m.error_handler(tsk)
53         m.out.put(tsk)
54         return
55
56     if ret:
57         tsk.err_code = ret
58         tsk.hasrun = CRASHED
59     else:
60         try:
61             tsk.post_run()
62         except Utils.WafError:
63             pass
64         except Exception:
65             tsk.err_msg = Utils.ex_stack()
66             tsk.hasrun = EXCEPTION
67         else:
68             tsk.hasrun = SUCCESS
69     if tsk.hasrun != SUCCESS:
70         m.error_handler(tsk)
71
72     m.out.put(tsk)
73
74 class Parallel(object):
75     """
76     keep the consumer threads busy, and avoid consuming cpu cycles
77     when no more tasks can be added (end of the build, etc)
78     """
79     def __init__(self, bld, j=2):
80
81         # number of consumers
82         self.numjobs = j
83
84         self.manager = bld.task_manager
85         self.manager.current_group = 0
86
87         self.total = self.manager.total()
88
89         # tasks waiting to be processed - IMPORTANT
90         self.outstanding = []
91         self.maxjobs = MAXJOBS
92
93         # tasks that are awaiting for another task to complete
94         self.frozen = []
95
96         # tasks returned by the consumers
97         self.out = Queue(0)
98
99         self.count = 0 # tasks not in the producer area
100
101         self.processed = 1 # progress indicator
102
103         self.stop = False # error condition to stop the build
104         self.error = False # error flag
105
106     def get_next(self):
107         "override this method to schedule the tasks in a particular order"
108         if not self.outstanding:
109             return None
110         return self.outstanding.pop(0)
111
112     def postpone(self, tsk):
113         "override this method to schedule the tasks in a particular order"
114         # TODO consider using a deque instead
115         if random.randint(0, 1):
116             self.frozen.insert(0, tsk)
117         else:
118             self.frozen.append(tsk)
119
120     def refill_task_list(self):
121         "called to set the next group of tasks"
122
123         while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
124             self.get_out()
125
126         while not self.outstanding:
127             if self.count:
128                 self.get_out()
129
130             if self.frozen:
131                 self.outstanding += self.frozen
132                 self.frozen = []
133             elif not self.count:
134                 (jobs, tmp) = self.manager.get_next_set()
135                 if jobs != None: self.maxjobs = jobs
136                 if tmp: self.outstanding += tmp
137                 break
138
139     def get_out(self):
140         "the tasks that are put to execute are all collected using get_out"
141         ret = self.out.get()
142         self.manager.add_finished(ret)
143         if not self.stop and getattr(ret, 'more_tasks', None):
144             self.outstanding += ret.more_tasks
145             self.total += len(ret.more_tasks)
146         self.count -= 1
147
148     def error_handler(self, tsk):
149         "by default, errors make the build stop (not thread safe so be careful)"
150         if not Options.options.keep:
151             self.stop = True
152         self.error = True
153
154     def start(self):
155         "execute the tasks"
156
157         while not self.stop:
158
159             self.refill_task_list()
160
161             # consider the next task
162             tsk = self.get_next()
163             if not tsk:
164                 if self.count:
165                     # tasks may add new ones after they are run
166                     continue
167                 else:
168                     # no tasks to run, no tasks running, time to exit
169                     break
170
171             if tsk.hasrun:
172                 # if the task is marked as "run", just skip it
173                 self.processed += 1
174                 self.manager.add_finished(tsk)
175                 continue
176
177             try:
178                 st = tsk.runnable_status()
179             except Exception, e:
180                 self.processed += 1
181                 if self.stop and not Options.options.keep:
182                     tsk.hasrun = SKIPPED
183                     self.manager.add_finished(tsk)
184                     continue
185                 self.error_handler(tsk)
186                 self.manager.add_finished(tsk)
187                 tsk.hasrun = EXCEPTION
188                 tsk.err_msg = Utils.ex_stack()
189                 continue
190
191             if st == ASK_LATER:
192                 self.postpone(tsk)
193             elif st == SKIP_ME:
194                 self.processed += 1
195                 tsk.hasrun = SKIPPED
196                 self.manager.add_finished(tsk)
197             else:
198                 # run me: put the task in ready queue
199                 tsk.position = (self.processed, self.total)
200                 self.count += 1
201                 self.processed += 1
202                 tsk.master = self
203
204                 process(tsk)
205
206         # self.count represents the tasks that have been made available to the consumer threads
207         # collect all the tasks after an error else the message may be incomplete
208         while self.error and self.count:
209             self.get_out()
210
211         #print loop
212         assert (self.count == 0 or self.stop)
213
214
215 # enable nothreads
216 import Runner
217 Runner.process = process
218 Runner.Parallel = Parallel