thirdparty:waf: New files for waf 1.9.10
[vlendec/samba-autobuild/.git] / third_party / waf / waflib / extras / netcache_client.py
1 #! /usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2011-2015 (ita)
4
5 """
6 A client for the network cache (playground/netcache/). Launch the server with:
7 ./netcache_server, then use it for the builds by adding the following:
8
9         def build(bld):
10                 bld.load('netcache_client')
11
12 The parameters should be present in the environment in the form:
13         NETCACHE=host:port waf configure build
14
15 Or in a more detailed way:
16         NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build
17
18 where:
19         host: host where the server resides, by default localhost
20         port: by default push on 11001 and pull on 12001
21
22 Use the server provided in playground/netcache/Netcache.java
23 """
24
25 import os, socket, time, atexit, sys
26 from waflib import Task, Logs, Utils, Build, Runner
27 from waflib.Configure import conf
28
29 BUF = 8192 * 16
30 HEADER_SIZE = 128
31 MODES = ['PUSH', 'PULL', 'PUSH_PULL']
32 STALE_TIME = 30 # seconds
33
34 GET = 'GET'
35 PUT = 'PUT'
36 LST = 'LST'
37 BYE = 'BYE'
38
39 all_sigs_in_cache = (0.0, [])
40
41 def put_data(conn, data):
42         if sys.hexversion > 0x3000000:
43                 data = data.encode('iso8859-1')
44         cnt = 0
45         while cnt < len(data):
46                 sent = conn.send(data[cnt:])
47                 if sent == 0:
48                         raise RuntimeError('connection ended')
49                 cnt += sent
50
51 push_connections = Runner.Queue(0)
52 pull_connections = Runner.Queue(0)
53 def get_connection(push=False):
54         # return a new connection... do not forget to release it!
55         try:
56                 if push:
57                         ret = push_connections.get(block=False)
58                 else:
59                         ret = pull_connections.get(block=False)
60         except Exception:
61                 ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
62                 if push:
63                         ret.connect(Task.push_addr)
64                 else:
65                         ret.connect(Task.pull_addr)
66         return ret
67
68 def release_connection(conn, msg='', push=False):
69         if conn:
70                 if push:
71                         push_connections.put(conn)
72                 else:
73                         pull_connections.put(conn)
74
75 def close_connection(conn, msg=''):
76         if conn:
77                 data = '%s,%s' % (BYE, msg)
78                 try:
79                         put_data(conn, data.ljust(HEADER_SIZE))
80                 except:
81                         pass
82                 try:
83                         conn.close()
84                 except:
85                         pass
86
87 def close_all():
88         for q in (push_connections, pull_connections):
89                 while q.qsize():
90                         conn = q.get()
91                         try:
92                                 close_connection(conn)
93                         except:
94                                 # ignore errors when cleaning up
95                                 pass
96 atexit.register(close_all)
97
98 def read_header(conn):
99         cnt = 0
100         buf = []
101         while cnt < HEADER_SIZE:
102                 data = conn.recv(HEADER_SIZE - cnt)
103                 if not data:
104                         #import traceback
105                         #traceback.print_stack()
106                         raise ValueError('connection ended when reading a header %r' % buf)
107                 buf.append(data)
108                 cnt += len(data)
109         if sys.hexversion > 0x3000000:
110                 ret = ''.encode('iso8859-1').join(buf)
111                 ret = ret.decode('iso8859-1')
112         else:
113                 ret = ''.join(buf)
114         return ret
115
116 def check_cache(conn, ssig):
117         """
118         List the files on the server, this is an optimization because it assumes that
119         concurrent builds are rare
120         """
121         global all_sigs_in_cache
122         if not STALE_TIME:
123                 return
124         if time.time() - all_sigs_in_cache[0] > STALE_TIME:
125
126                 params = (LST,'')
127                 put_data(conn, ','.join(params).ljust(HEADER_SIZE))
128
129                 # read what is coming back
130                 ret = read_header(conn)
131                 size = int(ret.split(',')[0])
132
133                 buf = []
134                 cnt = 0
135                 while cnt < size:
136                         data = conn.recv(min(BUF, size-cnt))
137                         if not data:
138                                 raise ValueError('connection ended %r %r' % (cnt, size))
139                         buf.append(data)
140                         cnt += len(data)
141
142                 if sys.hexversion > 0x3000000:
143                         ret = ''.encode('iso8859-1').join(buf)
144                         ret = ret.decode('iso8859-1')
145                 else:
146                         ret = ''.join(buf)
147
148                 all_sigs_in_cache = (time.time(), ret.splitlines())
149                 Logs.debug('netcache: server cache has %r entries' % len(all_sigs_in_cache[1]))
150
151         if not ssig in all_sigs_in_cache[1]:
152                 raise ValueError('no file %s in cache' % ssig)
153
154 class MissingFile(Exception):
155         pass
156
157 def recv_file(conn, ssig, count, p):
158         check_cache(conn, ssig)
159
160         params = (GET, ssig, str(count))
161         put_data(conn, ','.join(params).ljust(HEADER_SIZE))
162         data = read_header(conn)
163
164         size = int(data.split(',')[0])
165
166         if size == -1:
167                 raise MissingFile('no file %s - %s in cache' % (ssig, count))
168
169         # get the file, writing immediately
170         # TODO a tmp file would be better
171         f = open(p, 'wb')
172         cnt = 0
173         while cnt < size:
174                 data = conn.recv(min(BUF, size-cnt))
175                 if not data:
176                         raise ValueError('connection ended %r %r' % (cnt, size))
177                 f.write(data)
178                 cnt += len(data)
179         f.close()
180
181 def sock_send(conn, ssig, cnt, p):
182         #print "pushing %r %r %r" % (ssig, cnt, p)
183         size = os.stat(p).st_size
184         params = (PUT, ssig, str(cnt), str(size))
185         put_data(conn, ','.join(params).ljust(HEADER_SIZE))
186         f = open(p, 'rb')
187         cnt = 0
188         while cnt < size:
189                 r = f.read(min(BUF, size-cnt))
190                 while r:
191                         k = conn.send(r)
192                         if not k:
193                                 raise ValueError('connection ended')
194                         cnt += k
195                         r = r[k:]
196
197 def can_retrieve_cache(self):
198         if not Task.pull_addr:
199                 return False
200         if not self.outputs:
201                 return False
202         self.cached = False
203
204         cnt = 0
205         sig = self.signature()
206         ssig = Utils.to_hex(self.uid() + sig)
207
208         conn = None
209         err = False
210         try:
211                 try:
212                         conn = get_connection()
213                         for node in self.outputs:
214                                 p = node.abspath()
215                                 recv_file(conn, ssig, cnt, p)
216                                 cnt += 1
217                 except MissingFile as e:
218                         Logs.debug('netcache: file is not in the cache %r' % e)
219                         err = True
220
221                 except Exception as e:
222                         Logs.debug('netcache: could not get the files %r' % e)
223                         err = True
224
225                         # broken connection? remove this one
226                         close_connection(conn)
227                         conn = None
228         finally:
229                 release_connection(conn)
230         if err:
231                 return False
232
233         for node in self.outputs:
234                 node.sig = sig
235                 #if self.generator.bld.progress_bar < 1:
236                 #       self.generator.bld.to_log('restoring from cache %r\n' % node.abspath())
237
238         self.cached = True
239         return True
240
241 @Utils.run_once
242 def put_files_cache(self):
243         if not Task.push_addr:
244                 return
245         if not self.outputs:
246                 return
247         if getattr(self, 'cached', None):
248                 return
249
250         #print "called put_files_cache", id(self)
251         bld = self.generator.bld
252         sig = self.signature()
253         ssig = Utils.to_hex(self.uid() + sig)
254
255         conn = None
256         cnt = 0
257         try:
258                 for node in self.outputs:
259                         # We could re-create the signature of the task with the signature of the outputs
260                         # in practice, this means hashing the output files
261                         # this is unnecessary
262                         try:
263                                 if not conn:
264                                         conn = get_connection(push=True)
265                                 sock_send(conn, ssig, cnt, node.abspath())
266                         except Exception as e:
267                                 Logs.debug("netcache: could not push the files %r" % e)
268
269                                 # broken connection? remove this one
270                                 close_connection(conn)
271                                 conn = None
272                         cnt += 1
273         finally:
274                 release_connection(conn, push=True)
275
276         bld.task_sigs[self.uid()] = self.cache_sig
277
278 def hash_env_vars(self, env, vars_lst):
279         # reimplement so that the resulting hash does not depend on local paths
280         if not env.table:
281                 env = env.parent
282                 if not env:
283                         return Utils.SIG_NIL
284
285         idx = str(id(env)) + str(vars_lst)
286         try:
287                 cache = self.cache_env
288         except AttributeError:
289                 cache = self.cache_env = {}
290         else:
291                 try:
292                         return self.cache_env[idx]
293                 except KeyError:
294                         pass
295
296         v = str([env[a] for a in vars_lst])
297         v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
298         m = Utils.md5()
299         m.update(v.encode())
300         ret = m.digest()
301
302         Logs.debug('envhash: %r %r', ret, v)
303
304         cache[idx] = ret
305
306         return ret
307
308 def uid(self):
309         # reimplement so that the signature does not depend on local paths
310         try:
311                 return self.uid_
312         except AttributeError:
313                 m = Utils.md5()
314                 src = self.generator.bld.srcnode
315                 up = m.update
316                 up(self.__class__.__name__.encode())
317                 for x in self.inputs + self.outputs:
318                         up(x.path_from(src).encode())
319                 self.uid_ = m.digest()
320                 return self.uid_
321
322
323 def make_cached(cls):
324         if getattr(cls, 'nocache', None):
325                 return
326
327         m1 = cls.run
328         def run(self):
329                 if getattr(self, 'nocache', False):
330                         return m1(self)
331                 if self.can_retrieve_cache():
332                         return 0
333                 return m1(self)
334         cls.run = run
335
336         m2 = cls.post_run
337         def post_run(self):
338                 if getattr(self, 'nocache', False):
339                         return m2(self)
340                 bld = self.generator.bld
341                 ret = m2(self)
342                 if bld.cache_global:
343                         self.put_files_cache()
344                 if hasattr(self, 'chmod'):
345                         for node in self.outputs:
346                                 os.chmod(node.abspath(), self.chmod)
347                 return ret
348         cls.post_run = post_run
349
350 @conf
351 def setup_netcache(ctx, push_addr, pull_addr):
352         Task.Task.can_retrieve_cache = can_retrieve_cache
353         Task.Task.put_files_cache = put_files_cache
354         Task.Task.uid = uid
355         Task.push_addr = push_addr
356         Task.pull_addr = pull_addr
357         Build.BuildContext.hash_env_vars = hash_env_vars
358         ctx.cache_global = True
359
360         for x in Task.classes.values():
361                 make_cached(x)
362
363 def build(bld):
364         if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ:
365                 Logs.warn('Setting  NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001')
366                 os.environ['NETCACHE_PULL'] = '127.0.0.1:12001'
367                 os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001'
368
369         if 'NETCACHE' in os.environ:
370                 if not 'NETCACHE_PUSH' in os.environ:
371                         os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE']
372                 if not 'NETCACHE_PULL' in os.environ:
373                         os.environ['NETCACHE_PULL'] = os.environ['NETCACHE']
374
375         v = os.environ['NETCACHE_PULL']
376         if v:
377                 h, p = v.split(':')
378                 pull_addr = (h, int(p))
379         else:
380                 pull_addr = None
381
382         v = os.environ['NETCACHE_PUSH']
383         if v:
384                 h, p = v.split(':')
385                 push_addr = (h, int(p))
386         else:
387                 push_addr = None
388
389         setup_netcache(bld, push_addr, pull_addr)