Move waf into third_party/.
[samba.git] / third_party / waf / wafadmin / 3rdparty / prefork.py
1 #! /usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2015 (ita)
4 #
5 # prefer the waf 1.8 version
6
7 """
8 The full samba build can be faster by ~10%, but there are a few limitations:
9 * only one build process should be run at a time as the servers would use the same ports
10 * only one build command is going to be called ("waf build configure build" would not work)
11
12 def build(bld):
13
14     mod = Utils.load_tool('prefork')
15     mod.build(bld)
16     ...
17     (build declarations after)
18 """
19
20 import os, re, socket, threading, sys, subprocess, time, atexit, traceback
21 try:
22         import SocketServer
23 except ImportError:
24         import socketserver as SocketServer
25 try:
26         from queue import Queue
27 except ImportError:
28         from Queue import Queue
29 try:
30         import cPickle
31 except ImportError:
32         import pickle as cPickle
33
34 DEFAULT_PORT = 51200
35
36 HEADER_SIZE = 128
37
38 REQ = 'REQ'
39 RES = 'RES'
40 BYE = 'BYE'
41
42 def make_header(params):
43         header = ','.join(params)
44         if sys.hexversion > 0x3000000:
45                 header = header.encode('iso8859-1')
46         header = header.ljust(HEADER_SIZE)
47         assert(len(header) == HEADER_SIZE)
48         return header
49
50
51 re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$')
52 class req(SocketServer.StreamRequestHandler):
53         def handle(self):
54                 while 1:
55                         try:
56                                 self.process_command()
57                         except Exception as e:
58                                 print(e)
59                                 break
60
61         def process_command(self):
62                 query = self.rfile.read(HEADER_SIZE)
63                 if not query:
64                         return
65                 #print(len(query))
66                 assert(len(query) == HEADER_SIZE)
67                 if sys.hexversion > 0x3000000:
68                         query = query.decode('iso8859-1')
69                 #print "%r" % query
70                 if not re_valid_query.match(query):
71                         raise ValueError('Invalid query %r' % query)
72
73                 query = query.strip().split(',')
74
75                 if query[0] == REQ:
76                         self.run_command(query[1:])
77                 elif query[0] == BYE:
78                         raise ValueError('Exit')
79                 else:
80                         raise ValueError('Invalid query %r' % query)
81
82         def run_command(self, query):
83
84                 size = int(query[0])
85                 data = self.rfile.read(size)
86                 assert(len(data) == size)
87                 kw = cPickle.loads(data)
88
89                 # run command
90                 ret = out = err = exc = None
91                 cmd = kw['cmd']
92                 del kw['cmd']
93                 #print(cmd)
94
95                 try:
96                         if kw['stdout'] or kw['stderr']:
97                                 p = subprocess.Popen(cmd, **kw)
98                                 (out, err) = p.communicate()
99                                 ret = p.returncode
100                         else:
101                                 ret = subprocess.Popen(cmd, **kw).wait()
102                 except Exception as e:
103                         ret = -1
104                         exc = str(e) + traceback.format_exc()
105
106                 # write the results
107                 if out or err or exc:
108                         data = (out, err, exc)
109                         data = cPickle.dumps(data, -1)
110                 else:
111                         data = ''
112
113                 params = [RES, str(ret), str(len(data))]
114
115                 self.wfile.write(make_header(params))
116
117                 if data:
118                         self.wfile.write(data)
119
120 def create_server(conn, cls):
121         #SocketServer.ThreadingTCPServer.allow_reuse_address = True
122         #server = SocketServer.ThreadingTCPServer(conn, req)
123
124         SocketServer.TCPServer.allow_reuse_address = True
125         server = SocketServer.TCPServer(conn, req)
126         #server.timeout = 6000 # seconds
127         server.serve_forever(poll_interval=0.001)
128
129 if __name__ == '__main__':
130         if len(sys.argv) > 1:
131                 port = int(sys.argv[1])
132         else:
133                 port = DEFAULT_PORT
134         #conn = (socket.gethostname(), port)
135         conn = ("127.0.0.1", port)
136         #print("listening - %r %r\n" % conn)
137         create_server(conn, req)
138 else:
139
140         import Runner, Utils
141
142         def init_task_pool(self):
143                 # lazy creation, and set a common pool for all task consumers
144                 pool = self.pool = []
145                 for i in range(self.numjobs):
146                         consumer = Runner.get_pool()
147                         pool.append(consumer)
148                         consumer.idx = i
149                 self.ready = Queue(0)
150                 def setq(consumer):
151                         consumer.ready = self.ready
152                         try:
153                                 threading.current_thread().idx = consumer.idx
154                         except Exception as e:
155                                 print(e)
156                 for x in pool:
157                         x.ready.put(setq)
158                 return pool
159         Runner.Parallel.init_task_pool = init_task_pool
160
161         PORT = 51200
162
163         def make_server(idx):
164                 port = PORT + idx
165                 cmd = [sys.executable, os.path.abspath(__file__), str(port)]
166                 proc = subprocess.Popen(cmd)
167                 proc.port = port
168                 return proc
169
170         def make_conn(srv):
171                 #port = PORT + idx
172                 port = srv.port
173                 conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
174                 conn.connect(('127.0.0.1', port))
175                 return conn
176
177         SERVERS = []
178         CONNS = []
179         def close_all():
180                 while CONNS:
181                         conn = CONNS.pop()
182                         try:
183                                 conn.close()
184                         except:
185                                 pass
186                 while SERVERS:
187                         srv = SERVERS.pop()
188                         try:
189                                 srv.kill()
190                         except:
191                                 pass
192         atexit.register(close_all)
193
194         def put_data(conn, data):
195                 conn.send(data)
196
197         def read_data(conn, siz):
198                 ret = conn.recv(siz)
199                 if not ret:
200                         print("closed connection?")
201
202                 assert(len(ret) == siz)
203                 return ret
204
205         def exec_command(cmd, **kw):
206                 if 'log' in kw:
207                         log = kw['log']
208                         kw['stdout'] = kw['stderr'] = subprocess.PIPE
209                         del(kw['log'])
210                 else:
211                         kw['stdout'] = kw['stderr'] = None
212                 kw['shell'] = isinstance(cmd, str)
213
214                 idx = threading.current_thread().idx
215                 kw['cmd'] = cmd
216
217                 data = cPickle.dumps(kw, -1)
218                 params = [REQ, str(len(data))]
219                 header = make_header(params)
220
221                 conn = CONNS[idx]
222
223                 put_data(conn, header)
224                 put_data(conn, data)
225
226                 data = read_data(conn, HEADER_SIZE)
227                 if sys.hexversion > 0x3000000:
228                         data = data.decode('iso8859-1')
229
230                 lst = data.split(',')
231                 ret = int(lst[1])
232                 dlen = int(lst[2])
233
234                 out = err = None
235                 if dlen:
236                         data = read_data(conn, dlen)
237                         (out, err, exc) = cPickle.loads(data)
238                         if exc:
239                                 raise Utils.WafError('Execution failure: %s' % exc)
240
241                 if out:
242                         log.write(out)
243                 if err:
244                         log.write(err)
245
246                 return ret
247
248         def __init__(self):
249                 threading.Thread.__init__(self)
250
251                 # identifier of the current thread
252                 self.idx = len(SERVERS)
253
254                 # create a server and wait for the connection
255                 srv = make_server(self.idx)
256                 SERVERS.append(srv)
257
258                 conn = None
259                 for x in range(30):
260                         try:
261                                 conn = make_conn(srv)
262                                 break
263                         except socket.error:
264                                 time.sleep(0.01)
265                 if not conn:
266                         raise ValueError('Could not start the server!')
267                 CONNS.append(conn)
268
269                 self.setDaemon(1)
270                 self.start()
271         Runner.TaskConsumer.__init__ = __init__
272
273         def build(bld):
274                 # dangerous, there is no other command hopefully
275                 Utils.exec_command = exec_command