3 # Thomas Nagy, 2011-2015 (ita)
6 A client for the network cache (playground/netcache/). Launch the server with:
7 ./netcache_server, then use it for the builds by adding the following:
10 bld.load('netcache_client')
12 The parameters should be present in the environment in the form:
13 NETCACHE=host:port waf configure build
15 Or in a more detailed way:
16 NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build
19 host: host where the server resides, by default localhost
20 port: by default push on 11001 and pull on 12001
22 Use the server provided in playground/netcache/Netcache.java
25 import os, socket, time, atexit, sys
26 from waflib import Task, Logs, Utils, Build, Runner
27 from waflib.Configure import conf
31 MODES = ['PUSH', 'PULL', 'PUSH_PULL']
32 STALE_TIME = 30 # seconds
39 all_sigs_in_cache = (0.0, [])
41 def put_data(conn, data):
42 if sys.hexversion > 0x3000000:
43 data = data.encode('iso8859-1')
45 while cnt < len(data):
46 sent = conn.send(data[cnt:])
48 raise RuntimeError('connection ended')
51 push_connections = Runner.Queue(0)
52 pull_connections = Runner.Queue(0)
53 def get_connection(push=False):
54 # return a new connection... do not forget to release it!
57 ret = push_connections.get(block=False)
59 ret = pull_connections.get(block=False)
61 ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
63 ret.connect(Task.push_addr)
65 ret.connect(Task.pull_addr)
68 def release_connection(conn, msg='', push=False):
71 push_connections.put(conn)
73 pull_connections.put(conn)
75 def close_connection(conn, msg=''):
77 data = '%s,%s' % (BYE, msg)
79 put_data(conn, data.ljust(HEADER_SIZE))
88 for q in (push_connections, pull_connections):
92 close_connection(conn)
94 # ignore errors when cleaning up
96 atexit.register(close_all)
98 def read_header(conn):
101 while cnt < HEADER_SIZE:
102 data = conn.recv(HEADER_SIZE - cnt)
105 #traceback.print_stack()
106 raise ValueError('connection ended when reading a header %r' % buf)
109 if sys.hexversion > 0x3000000:
110 ret = ''.encode('iso8859-1').join(buf)
111 ret = ret.decode('iso8859-1')
116 def check_cache(conn, ssig):
118 List the files on the server, this is an optimization because it assumes that
119 concurrent builds are rare
121 global all_sigs_in_cache
124 if time.time() - all_sigs_in_cache[0] > STALE_TIME:
127 put_data(conn, ','.join(params).ljust(HEADER_SIZE))
129 # read what is coming back
130 ret = read_header(conn)
131 size = int(ret.split(',')[0])
136 data = conn.recv(min(BUF, size-cnt))
138 raise ValueError('connection ended %r %r' % (cnt, size))
142 if sys.hexversion > 0x3000000:
143 ret = ''.encode('iso8859-1').join(buf)
144 ret = ret.decode('iso8859-1')
148 all_sigs_in_cache = (time.time(), ret.splitlines())
149 Logs.debug('netcache: server cache has %r entries' % len(all_sigs_in_cache[1]))
151 if not ssig in all_sigs_in_cache[1]:
152 raise ValueError('no file %s in cache' % ssig)
154 class MissingFile(Exception):
157 def recv_file(conn, ssig, count, p):
158 check_cache(conn, ssig)
160 params = (GET, ssig, str(count))
161 put_data(conn, ','.join(params).ljust(HEADER_SIZE))
162 data = read_header(conn)
164 size = int(data.split(',')[0])
167 raise MissingFile('no file %s - %s in cache' % (ssig, count))
169 # get the file, writing immediately
170 # TODO a tmp file would be better
174 data = conn.recv(min(BUF, size-cnt))
176 raise ValueError('connection ended %r %r' % (cnt, size))
181 def sock_send(conn, ssig, cnt, p):
182 #print "pushing %r %r %r" % (ssig, cnt, p)
183 size = os.stat(p).st_size
184 params = (PUT, ssig, str(cnt), str(size))
185 put_data(conn, ','.join(params).ljust(HEADER_SIZE))
189 r = f.read(min(BUF, size-cnt))
193 raise ValueError('connection ended')
197 def can_retrieve_cache(self):
198 if not Task.pull_addr:
205 sig = self.signature()
206 ssig = Utils.to_hex(self.uid() + sig)
212 conn = get_connection()
213 for node in self.outputs:
215 recv_file(conn, ssig, cnt, p)
217 except MissingFile as e:
218 Logs.debug('netcache: file is not in the cache %r' % e)
221 except Exception as e:
222 Logs.debug('netcache: could not get the files %r' % e)
225 # broken connection? remove this one
226 close_connection(conn)
229 release_connection(conn)
233 for node in self.outputs:
235 #if self.generator.bld.progress_bar < 1:
236 # self.generator.bld.to_log('restoring from cache %r\n' % node.abspath())
242 def put_files_cache(self):
243 if not Task.push_addr:
247 if getattr(self, 'cached', None):
250 #print "called put_files_cache", id(self)
251 bld = self.generator.bld
252 sig = self.signature()
253 ssig = Utils.to_hex(self.uid() + sig)
258 for node in self.outputs:
259 # We could re-create the signature of the task with the signature of the outputs
260 # in practice, this means hashing the output files
261 # this is unnecessary
264 conn = get_connection(push=True)
265 sock_send(conn, ssig, cnt, node.abspath())
266 except Exception as e:
267 Logs.debug("netcache: could not push the files %r" % e)
269 # broken connection? remove this one
270 close_connection(conn)
274 release_connection(conn, push=True)
276 bld.task_sigs[self.uid()] = self.cache_sig
278 def hash_env_vars(self, env, vars_lst):
279 # reimplement so that the resulting hash does not depend on local paths
285 idx = str(id(env)) + str(vars_lst)
287 cache = self.cache_env
288 except AttributeError:
289 cache = self.cache_env = {}
292 return self.cache_env[idx]
296 v = str([env[a] for a in vars_lst])
297 v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
302 Logs.debug('envhash: %r %r', ret, v)
309 # reimplement so that the signature does not depend on local paths
312 except AttributeError:
314 src = self.generator.bld.srcnode
316 up(self.__class__.__name__.encode())
317 for x in self.inputs + self.outputs:
318 up(x.path_from(src).encode())
319 self.uid_ = m.digest()
323 def make_cached(cls):
324 if getattr(cls, 'nocache', None):
329 if getattr(self, 'nocache', False):
331 if self.can_retrieve_cache():
338 if getattr(self, 'nocache', False):
340 bld = self.generator.bld
343 self.put_files_cache()
344 if hasattr(self, 'chmod'):
345 for node in self.outputs:
346 os.chmod(node.abspath(), self.chmod)
348 cls.post_run = post_run
351 def setup_netcache(ctx, push_addr, pull_addr):
352 Task.Task.can_retrieve_cache = can_retrieve_cache
353 Task.Task.put_files_cache = put_files_cache
355 Task.push_addr = push_addr
356 Task.pull_addr = pull_addr
357 Build.BuildContext.hash_env_vars = hash_env_vars
358 ctx.cache_global = True
360 for x in Task.classes.values():
364 if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ:
365 Logs.warn('Setting NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001')
366 os.environ['NETCACHE_PULL'] = '127.0.0.1:12001'
367 os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001'
369 if 'NETCACHE' in os.environ:
370 if not 'NETCACHE_PUSH' in os.environ:
371 os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE']
372 if not 'NETCACHE_PULL' in os.environ:
373 os.environ['NETCACHE_PULL'] = os.environ['NETCACHE']
375 v = os.environ['NETCACHE_PULL']
378 pull_addr = (h, int(p))
382 v = os.environ['NETCACHE_PUSH']
385 push_addr = (h, int(p))
389 setup_netcache(bld, push_addr, pull_addr)