build:wafsamba: Remove unnecessary parameters to cmd_and_log
[samba.git] / third_party / waf / waflib / extras / preforkunix.py
1 #! /usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2015 (ita)
4
5 """
6 A version of prefork.py that uses unix sockets. The advantage is that it does not expose
7 connections to the outside. Yet performance it only works on unix-like systems
8 and performance can be slightly worse.
9
10 To use::
11
12     def options(opt):
13         # recommended, fork new processes before using more memory
14         opt.load('preforkunix')
15
16     def build(bld):
17         bld.load('preforkunix')
18         ...
19         more code
20 """
21
22 import os, re, socket, threading, sys, subprocess, atexit, traceback, signal, time
23 try:
24         from queue import Queue
25 except ImportError:
26         from Queue import Queue
27 try:
28         import cPickle
29 except ImportError:
30         import pickle as cPickle
31
32 HEADER_SIZE = 20
33
34 REQ = 'REQ'
35 RES = 'RES'
36 BYE = 'BYE'
37
38 def make_header(params, cookie=''):
39         header = ','.join(params)
40         header = header.ljust(HEADER_SIZE - len(cookie))
41         assert(len(header) == HEADER_SIZE - len(cookie))
42         header = header + cookie
43         if sys.hexversion > 0x3000000:
44                 header = header.encode('iso8859-1')
45         return header
46
47 re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$')
48 if 1:
49         def send_response(conn, ret, out, err, exc):
50                 if out or err or exc:
51                         data = (out, err, exc)
52                         data = cPickle.dumps(data, -1)
53                 else:
54                         data = ''
55
56                 params = [RES, str(ret), str(len(data))]
57
58                 # no need for the cookie in the response
59                 conn.send(make_header(params))
60                 if data:
61                         conn.send(data)
62
63         def process_command(conn):
64                 query = conn.recv(HEADER_SIZE)
65                 if not query:
66                         return None
67                 #print(len(query))
68                 assert(len(query) == HEADER_SIZE)
69                 if sys.hexversion > 0x3000000:
70                         query = query.decode('iso8859-1')
71
72                 #print "%r" % query
73                 if not re_valid_query.match(query):
74                         send_response(conn, -1, '', '', 'Invalid query %r' % query)
75                         raise ValueError('Invalid query %r' % query)
76
77                 query = query.strip().split(',')
78
79                 if query[0] == REQ:
80                         run_command(conn, query[1:])
81                 elif query[0] == BYE:
82                         raise ValueError('Exit')
83                 else:
84                         raise ValueError('Invalid query %r' % query)
85                 return 'ok'
86
87         def run_command(conn, query):
88
89                 size = int(query[0])
90                 data = conn.recv(size)
91                 assert(len(data) == size)
92                 kw = cPickle.loads(data)
93
94                 # run command
95                 ret = out = err = exc = None
96                 cmd = kw['cmd']
97                 del kw['cmd']
98                 #print(cmd)
99
100                 try:
101                         if kw['stdout'] or kw['stderr']:
102                                 p = subprocess.Popen(cmd, **kw)
103                                 (out, err) = p.communicate()
104                                 ret = p.returncode
105                         else:
106                                 ret = subprocess.Popen(cmd, **kw).wait()
107                 except KeyboardInterrupt:
108                         raise
109                 except Exception as e:
110                         ret = -1
111                         exc = str(e) + traceback.format_exc()
112
113                 send_response(conn, ret, out, err, exc)
114
115 if 1:
116
117         from waflib import Logs, Utils, Runner, Errors, Options
118
119         def init_task_pool(self):
120                 # lazy creation, and set a common pool for all task consumers
121                 pool = self.pool = []
122                 for i in range(self.numjobs):
123                         consumer = Runner.get_pool()
124                         pool.append(consumer)
125                         consumer.idx = i
126                 self.ready = Queue(0)
127                 def setq(consumer):
128                         consumer.ready = self.ready
129                         try:
130                                 threading.current_thread().idx = consumer.idx
131                         except Exception as e:
132                                 print(e)
133                 for x in pool:
134                         x.ready.put(setq)
135                 return pool
136         Runner.Parallel.init_task_pool = init_task_pool
137
138         def make_conn(bld):
139                 child_socket, parent_socket = socket.socketpair(socket.AF_UNIX)
140                 ppid = os.getpid()
141                 pid = os.fork()
142                 if pid == 0:
143                         parent_socket.close()
144
145                         # if the parent crashes, try to exit cleanly
146                         def reap():
147                                 while 1:
148                                         try:
149                                                 os.kill(ppid, 0)
150                                         except OSError:
151                                                 break
152                                         else:
153                                                 time.sleep(1)
154                                 os.kill(os.getpid(), signal.SIGKILL)
155                         t = threading.Thread(target=reap)
156                         t.setDaemon(True)
157                         t.start()
158
159                         # write to child_socket only
160                         try:
161                                 while process_command(child_socket):
162                                         pass
163                         except KeyboardInterrupt:
164                                 sys.exit(2)
165                 else:
166                         child_socket.close()
167                         return (pid, parent_socket)
168
169         SERVERS = []
170         CONNS = []
171         def close_all():
172                 global SERVERS, CONS
173                 while CONNS:
174                         conn = CONNS.pop()
175                         try:
176                                 conn.close()
177                         except:
178                                 pass
179                 while SERVERS:
180                         pid = SERVERS.pop()
181                         try:
182                                 os.kill(pid, 9)
183                         except:
184                                 pass
185         atexit.register(close_all)
186
187         def put_data(conn, data):
188                 cnt = 0
189                 while cnt < len(data):
190                         sent = conn.send(data[cnt:])
191                         if sent == 0:
192                                 raise RuntimeError('connection ended')
193                         cnt += sent
194
195         def read_data(conn, siz):
196                 cnt = 0
197                 buf = []
198                 while cnt < siz:
199                         data = conn.recv(min(siz - cnt, 1024))
200                         if not data:
201                                 raise RuntimeError('connection ended %r %r' % (cnt, siz))
202                         buf.append(data)
203                         cnt += len(data)
204                 if sys.hexversion > 0x3000000:
205                         ret = ''.encode('iso8859-1').join(buf)
206                 else:
207                         ret = ''.join(buf)
208                 return ret
209
210         def exec_command(self, cmd, **kw):
211                 if 'stdout' in kw:
212                         if kw['stdout'] not in (None, subprocess.PIPE):
213                                 return self.exec_command_old(cmd, **kw)
214                 elif 'stderr' in kw:
215                         if kw['stderr'] not in (None, subprocess.PIPE):
216                                 return self.exec_command_old(cmd, **kw)
217
218                 kw['shell'] = isinstance(cmd, str)
219                 Logs.debug('runner: %r' % cmd)
220                 Logs.debug('runner_env: kw=%s' % kw)
221
222                 if self.logger:
223                         self.logger.info(cmd)
224
225                 if 'stdout' not in kw:
226                         kw['stdout'] = subprocess.PIPE
227                 if 'stderr' not in kw:
228                         kw['stderr'] = subprocess.PIPE
229
230                 if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]):
231                         raise Errors.WafError("Program %s not found!" % cmd[0])
232
233                 idx = threading.current_thread().idx
234                 kw['cmd'] = cmd
235
236                 # serialization..
237                 #print("sub %r %r" % (idx, cmd))
238                 #print("write to %r %r" % (idx, cmd))
239
240                 data = cPickle.dumps(kw, -1)
241                 params = [REQ, str(len(data))]
242                 header = make_header(params)
243
244                 conn = CONNS[idx]
245
246                 put_data(conn, header + data)
247
248                 #print("running %r %r" % (idx, cmd))
249                 #print("read from %r %r" % (idx, cmd))
250
251                 data = read_data(conn, HEADER_SIZE)
252                 if sys.hexversion > 0x3000000:
253                         data = data.decode('iso8859-1')
254
255                 #print("received %r" % data)
256                 lst = data.split(',')
257                 ret = int(lst[1])
258                 dlen = int(lst[2])
259
260                 out = err = None
261                 if dlen:
262                         data = read_data(conn, dlen)
263                         (out, err, exc) = cPickle.loads(data)
264                         if exc:
265                                 raise Errors.WafError('Execution failure: %s' % exc)
266
267                 if out:
268                         if not isinstance(out, str):
269                                 out = out.decode(sys.stdout.encoding or 'iso8859-1')
270                         if self.logger:
271                                 self.logger.debug('out: %s' % out)
272                         else:
273                                 Logs.info(out, extra={'stream':sys.stdout, 'c1': ''})
274                 if err:
275                         if not isinstance(err, str):
276                                 err = err.decode(sys.stdout.encoding or 'iso8859-1')
277                         if self.logger:
278                                 self.logger.error('err: %s' % err)
279                         else:
280                                 Logs.info(err, extra={'stream':sys.stderr, 'c1': ''})
281
282                 return ret
283
284         def init_smp(self):
285                 if not getattr(Options.options, 'smp', getattr(self, 'smp', None)):
286                         return
287                 if Utils.unversioned_sys_platform() in ('freebsd',):
288                         pid = os.getpid()
289                         cmd = ['cpuset', '-l', '0', '-p', str(pid)]
290                 elif Utils.unversioned_sys_platform() in ('linux',):
291                         pid = os.getpid()
292                         cmd = ['taskset', '-pc', '0', str(pid)]
293                 if cmd:
294                         self.cmd_and_log(cmd, quiet=0)
295
296         def options(opt):
297                 # memory consumption might be at the lowest point while processing options
298                 opt.add_option('--pin-process', action='store_true', dest='smp', default=False)
299                 if Utils.is_win32 or os.sep != '/':
300                         return
301                 while len(CONNS) < 30:
302                         (pid, conn) = make_conn(opt)
303                         SERVERS.append(pid)
304                         CONNS.append(conn)
305
306         def build(bld):
307                 if Utils.is_win32 or os.sep != '/':
308                         return
309                 if bld.cmd == 'clean':
310                         return
311                 while len(CONNS) < bld.jobs:
312                         (pid, conn) = make_conn(bld)
313                         SERVERS.append(pid)
314                         CONNS.append(conn)
315                 init_smp(bld)
316                 bld.__class__.exec_command_old = bld.__class__.exec_command
317                 bld.__class__.exec_command = exec_command