build: enable 'nothreads' when JOBS=1
authorAndrew Tridgell <tridge@samba.org>
Wed, 31 Mar 2010 09:56:00 +0000 (20:56 +1100)
committerAndrew Tridgell <tridge@samba.org>
Tue, 6 Apr 2010 10:27:21 +0000 (20:27 +1000)
this makes waf not use pthreads, which should fix the problems on AIX
and maybe on HPUX. It looks like process handling with Python on AIX
is broken if threads are used.

When JOBS=1 we don't need threads anyway.

buildtools/wafsamba/nothreads.py [new file with mode: 0644]
buildtools/wafsamba/wafsamba.py

diff --git a/buildtools/wafsamba/nothreads.py b/buildtools/wafsamba/nothreads.py
new file mode 100644 (file)
index 0000000..a7cfa73
--- /dev/null
@@ -0,0 +1,219 @@
+#!/usr/bin/env python
+# encoding: utf-8
+# Thomas Nagy, 2005-2008 (ita)
+
+# this replaces the core of Runner.py in waf with a varient that works
+# on systems with completely broken threading (such as Python 2.5.x on
+# AIX). For simplicity we enable this when JOBS=1, which is triggered
+# by the compatibility makefile used for the waf build. That also ensures
+# this code is tested, as it means it is used in the build farm, and by
+# anyone using 'make' to build Samba with waf
+
+"Execute the tasks"
+
+import sys, random, time, threading, traceback, os
+try: from Queue import Queue
+except ImportError: from queue import Queue
+import Build, Utils, Logs, Options
+from Logs import debug, error
+from Constants import *
+
+GAP = 15
+
+run_old = threading.Thread.run
+def run(*args, **kwargs):
+       try:
+               run_old(*args, **kwargs)
+       except (KeyboardInterrupt, SystemExit):
+               raise
+       except:
+               sys.excepthook(*sys.exc_info())
+threading.Thread.run = run
+
+
+class TaskConsumer(object):
+       consumers = 1
+
+def process(tsk):
+       m = tsk.master
+       if m.stop:
+               m.out.put(tsk)
+               return
+
+       try:
+               tsk.generator.bld.printout(tsk.display())
+               if tsk.__class__.stat: ret = tsk.__class__.stat(tsk)
+               # actual call to task's run() function
+               else: ret = tsk.call_run()
+       except Exception, e:
+               tsk.err_msg = Utils.ex_stack()
+               tsk.hasrun = EXCEPTION
+
+               # TODO cleanup
+               m.error_handler(tsk)
+               m.out.put(tsk)
+               return
+
+       if ret:
+               tsk.err_code = ret
+               tsk.hasrun = CRASHED
+       else:
+               try:
+                       tsk.post_run()
+               except Utils.WafError:
+                       pass
+               except Exception:
+                       tsk.err_msg = Utils.ex_stack()
+                       tsk.hasrun = EXCEPTION
+               else:
+                       tsk.hasrun = SUCCESS
+       if tsk.hasrun != SUCCESS:
+               m.error_handler(tsk)
+
+       m.out.put(tsk)
+
+class Parallel(object):
+       """
+       keep the consumer threads busy, and avoid consuming cpu cycles
+       when no more tasks can be added (end of the build, etc)
+       """
+       def __init__(self, bld, j=2):
+
+               # number of consumers
+               self.numjobs = j
+
+               self.manager = bld.task_manager
+               self.manager.current_group = 0
+
+               self.total = self.manager.total()
+
+               # tasks waiting to be processed - IMPORTANT
+               self.outstanding = []
+               self.maxjobs = MAXJOBS
+
+               # tasks that are awaiting for another task to complete
+               self.frozen = []
+
+               # tasks returned by the consumers
+               self.out = Queue(0)
+
+               self.count = 0 # tasks not in the producer area
+
+               self.processed = 1 # progress indicator
+
+               self.stop = False # error condition to stop the build
+               self.error = False # error flag
+
+       def get_next(self):
+               "override this method to schedule the tasks in a particular order"
+               if not self.outstanding:
+                       return None
+               return self.outstanding.pop(0)
+
+       def postpone(self, tsk):
+               "override this method to schedule the tasks in a particular order"
+               # TODO consider using a deque instead
+               if random.randint(0, 1):
+                       self.frozen.insert(0, tsk)
+               else:
+                       self.frozen.append(tsk)
+
+       def refill_task_list(self):
+               "called to set the next group of tasks"
+
+               while self.count > self.numjobs + GAP or self.count >= self.maxjobs:
+                       self.get_out()
+
+               while not self.outstanding:
+                       if self.count:
+                               self.get_out()
+
+                       if self.frozen:
+                               self.outstanding += self.frozen
+                               self.frozen = []
+                       elif not self.count:
+                               (jobs, tmp) = self.manager.get_next_set()
+                               if jobs != None: self.maxjobs = jobs
+                               if tmp: self.outstanding += tmp
+                               break
+
+       def get_out(self):
+               "the tasks that are put to execute are all collected using get_out"
+               ret = self.out.get()
+               self.manager.add_finished(ret)
+               if not self.stop and getattr(ret, 'more_tasks', None):
+                       self.outstanding += ret.more_tasks
+                       self.total += len(ret.more_tasks)
+               self.count -= 1
+
+       def error_handler(self, tsk):
+               "by default, errors make the build stop (not thread safe so be careful)"
+               if not Options.options.keep:
+                       self.stop = True
+               self.error = True
+
+       def start(self):
+               "execute the tasks"
+
+               while not self.stop:
+
+                       self.refill_task_list()
+
+                       # consider the next task
+                       tsk = self.get_next()
+                       if not tsk:
+                               if self.count:
+                                       # tasks may add new ones after they are run
+                                       continue
+                               else:
+                                       # no tasks to run, no tasks running, time to exit
+                                       break
+
+                       if tsk.hasrun:
+                               # if the task is marked as "run", just skip it
+                               self.processed += 1
+                               self.manager.add_finished(tsk)
+                               continue
+
+                       try:
+                               st = tsk.runnable_status()
+                       except Exception, e:
+                               self.processed += 1
+                               if self.stop and not Options.options.keep:
+                                       tsk.hasrun = SKIPPED
+                                       self.manager.add_finished(tsk)
+                                       continue
+                               self.error_handler(tsk)
+                               self.manager.add_finished(tsk)
+                               tsk.hasrun = EXCEPTION
+                               tsk.err_msg = Utils.ex_stack()
+                               continue
+
+                       if st == ASK_LATER:
+                               self.postpone(tsk)
+                       elif st == SKIP_ME:
+                               self.processed += 1
+                               tsk.hasrun = SKIPPED
+                               self.manager.add_finished(tsk)
+                       else:
+                               # run me: put the task in ready queue
+                               tsk.position = (self.processed, self.total)
+                               self.count += 1
+                               tsk.master = self
+
+                               process(tsk)
+
+               # self.count represents the tasks that have been made available to the consumer threads
+               # collect all the tasks after an error else the message may be incomplete
+               while self.error and self.count:
+                       self.get_out()
+
+               #print loop
+               assert (self.count == 0 or self.stop)
+
+
+# enable nothreads if -j1 is used from the makefile
+if os.environ.get('JOBS') == '1':
+       import Runner
+       Runner.process = process
+       Runner.Parallel = Parallel
index 51d85731eb84a1d94e2a064192b601c59359a1bc..2fc7eceee51538f43941063a3cc61a7e0f10ee4f 100644 (file)
@@ -19,6 +19,7 @@ from samba_python import *
 from samba_deps import *
 from samba_bundled import *
 import samba_conftests
+import nothreads
 
 LIB_PATH="shared"