3 # Thomas Nagy, 2015 (ita)
6 A version of prefork.py that uses unix sockets. The advantage is that it does not expose
7 connections to the outside. Yet performance it only works on unix-like systems
8 and performance can be slightly worse.
13 # recommended, fork new processes before using more memory
14 opt.load('preforkunix')
17 bld.load('preforkunix')
22 import os, re, socket, threading, sys, subprocess, atexit, traceback, signal, time
24 from queue import Queue
26 from Queue import Queue
30 import pickle as cPickle
38 def make_header(params, cookie=''):
39 header = ','.join(params)
40 header = header.ljust(HEADER_SIZE - len(cookie))
41 assert(len(header) == HEADER_SIZE - len(cookie))
42 header = header + cookie
43 if sys.hexversion > 0x3000000:
44 header = header.encode('iso8859-1')
47 re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$')
49 def send_response(conn, ret, out, err, exc):
51 data = (out, err, exc)
52 data = cPickle.dumps(data, -1)
56 params = [RES, str(ret), str(len(data))]
58 # no need for the cookie in the response
59 conn.send(make_header(params))
63 def process_command(conn):
64 query = conn.recv(HEADER_SIZE)
68 assert(len(query) == HEADER_SIZE)
69 if sys.hexversion > 0x3000000:
70 query = query.decode('iso8859-1')
73 if not re_valid_query.match(query):
74 send_response(conn, -1, '', '', 'Invalid query %r' % query)
75 raise ValueError('Invalid query %r' % query)
77 query = query.strip().split(',')
80 run_command(conn, query[1:])
82 raise ValueError('Exit')
84 raise ValueError('Invalid query %r' % query)
87 def run_command(conn, query):
90 data = conn.recv(size)
91 assert(len(data) == size)
92 kw = cPickle.loads(data)
95 ret = out = err = exc = None
101 if kw['stdout'] or kw['stderr']:
102 p = subprocess.Popen(cmd, **kw)
103 (out, err) = p.communicate()
106 ret = subprocess.Popen(cmd, **kw).wait()
107 except KeyboardInterrupt:
109 except Exception as e:
111 exc = str(e) + traceback.format_exc()
113 send_response(conn, ret, out, err, exc)
117 from waflib import Logs, Utils, Runner, Errors, Options
119 def init_task_pool(self):
120 # lazy creation, and set a common pool for all task consumers
121 pool = self.pool = []
122 for i in range(self.numjobs):
123 consumer = Runner.get_pool()
124 pool.append(consumer)
126 self.ready = Queue(0)
128 consumer.ready = self.ready
130 threading.current_thread().idx = consumer.idx
131 except Exception as e:
136 Runner.Parallel.init_task_pool = init_task_pool
139 child_socket, parent_socket = socket.socketpair(socket.AF_UNIX)
143 parent_socket.close()
145 # if the parent crashes, try to exit cleanly
154 os.kill(os.getpid(), signal.SIGKILL)
155 t = threading.Thread(target=reap)
159 # write to child_socket only
161 while process_command(child_socket):
163 except KeyboardInterrupt:
167 return (pid, parent_socket)
185 atexit.register(close_all)
187 def put_data(conn, data):
189 while cnt < len(data):
190 sent = conn.send(data[cnt:])
192 raise RuntimeError('connection ended')
195 def read_data(conn, siz):
199 data = conn.recv(min(siz - cnt, 1024))
201 raise RuntimeError('connection ended %r %r' % (cnt, siz))
204 if sys.hexversion > 0x3000000:
205 ret = ''.encode('iso8859-1').join(buf)
210 def exec_command(self, cmd, **kw):
212 if kw['stdout'] not in (None, subprocess.PIPE):
213 return self.exec_command_old(cmd, **kw)
215 if kw['stderr'] not in (None, subprocess.PIPE):
216 return self.exec_command_old(cmd, **kw)
218 kw['shell'] = isinstance(cmd, str)
219 Logs.debug('runner: %r' % cmd)
220 Logs.debug('runner_env: kw=%s' % kw)
223 self.logger.info(cmd)
225 if 'stdout' not in kw:
226 kw['stdout'] = subprocess.PIPE
227 if 'stderr' not in kw:
228 kw['stderr'] = subprocess.PIPE
230 if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]):
231 raise Errors.WafError("Program %s not found!" % cmd[0])
233 idx = threading.current_thread().idx
237 #print("sub %r %r" % (idx, cmd))
238 #print("write to %r %r" % (idx, cmd))
240 data = cPickle.dumps(kw, -1)
241 params = [REQ, str(len(data))]
242 header = make_header(params)
246 put_data(conn, header + data)
248 #print("running %r %r" % (idx, cmd))
249 #print("read from %r %r" % (idx, cmd))
251 data = read_data(conn, HEADER_SIZE)
252 if sys.hexversion > 0x3000000:
253 data = data.decode('iso8859-1')
255 #print("received %r" % data)
256 lst = data.split(',')
262 data = read_data(conn, dlen)
263 (out, err, exc) = cPickle.loads(data)
265 raise Errors.WafError('Execution failure: %s' % exc)
268 if not isinstance(out, str):
269 out = out.decode(sys.stdout.encoding or 'iso8859-1')
271 self.logger.debug('out: %s' % out)
273 Logs.info(out, extra={'stream':sys.stdout, 'c1': ''})
275 if not isinstance(err, str):
276 err = err.decode(sys.stdout.encoding or 'iso8859-1')
278 self.logger.error('err: %s' % err)
280 Logs.info(err, extra={'stream':sys.stderr, 'c1': ''})
285 if not getattr(Options.options, 'smp', getattr(self, 'smp', None)):
287 if Utils.unversioned_sys_platform() in ('freebsd',):
289 cmd = ['cpuset', '-l', '0', '-p', str(pid)]
290 elif Utils.unversioned_sys_platform() in ('linux',):
292 cmd = ['taskset', '-pc', '0', str(pid)]
294 self.cmd_and_log(cmd, quiet=0)
297 # memory consumption might be at the lowest point while processing options
298 opt.add_option('--pin-process', action='store_true', dest='smp', default=False)
299 if Utils.is_win32 or os.sep != '/':
301 while len(CONNS) < 30:
302 (pid, conn) = make_conn(opt)
307 if Utils.is_win32 or os.sep != '/':
309 if bld.cmd == 'clean':
311 while len(CONNS) < bld.jobs:
312 (pid, conn) = make_conn(bld)
316 bld.__class__.exec_command_old = bld.__class__.exec_command
317 bld.__class__.exec_command = exec_command