3 # Thomas Nagy, 2019 (ita)
6 Filesystem-based cache system to share and re-use build artifacts
8 Cache access operations (copy to and from) are delegated to
9 independent pre-forked worker subprocesses.
11 The following environment variables may be set:
12 * WAFCACHE: several possibilities:
14 absolute path of the waf cache (~/.cache/wafcache_user,
15 where `user` represents the currently logged-in user)
16 - URL to a cache server, for example:
17 export WAFCACHE=http://localhost:8080/files/
18 in that case, GET/POST requests are made to urls of the form
19 http://localhost:8080/files/000000000/0 (cache management is delegated to the server)
20 - GCS, S3 or MINIO bucket
21 gs://my-bucket/ (uses gsutil command line tool or WAFCACHE_CMD)
22 s3://my-bucket/ (uses aws command line tool or WAFCACHE_CMD)
23 minio://my-bucket/ (uses mc command line tool or WAFCACHE_CMD)
24 * WAFCACHE_CMD: bucket upload/download command, for example:
25 WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}"
26 Note that the WAFCACHE bucket value is used for the source or destination
27 depending on the operation (upload or download). For example, with:
28 WAFCACHE="gs://mybucket/"
29 the following commands may be run:
30 gsutil cp build/myprogram gs://mybucket/aa/aaaaa/1
31 gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile
32 * WAFCACHE_NO_PUSH: if set, disables pushing to the cache
33 * WAFCACHE_VERBOSITY: if set, displays more detailed cache operations
34 * WAFCACHE_STATS: if set, displays cache usage statistics on exit
36 File cache specific options:
37 Files are copied using hard links by default; if the cache is located
38 onto another partition, the system switches to file copies instead.
39 * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
40 * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
41 * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
42 and trim the cache (3 minutes)
44 Upload specific options:
45 * WAFCACHE_ASYNC_WORKERS: define a number of workers to upload results asynchronously
46 this may improve build performance with many/long file uploads
47 the default is unset (synchronous uploads)
48 * WAFCACHE_ASYNC_NOWAIT: do not wait for uploads to complete (default: False)
49 this requires asynchonous uploads to have an effect
59 waf clean build --zone=wafcache
62 import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, threading, traceback, urllib3, shlex
64 import subprocess32 as subprocess
68 base_cache = os.path.expanduser('~/.cache/')
69 if not os.path.isdir(base_cache):
71 default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser())
73 CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir)
74 WAFCACHE_CMD = os.environ.get('WAFCACHE_CMD')
75 TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
76 EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
77 EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
78 WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0
79 WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0
80 WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0
81 WAFCACHE_ASYNC_WORKERS = os.environ.get('WAFCACHE_ASYNC_WORKERS')
82 WAFCACHE_ASYNC_NOWAIT = os.environ.get('WAFCACHE_ASYNC_NOWAIT')
85 re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})')
90 import pickle as cPickle
92 if __name__ != '__main__':
93 from waflib import Task, Logs, Utils, Build
95 def can_retrieve_cache(self):
97 New method for waf Task classes
104 sig = self.signature()
105 ssig = Utils.to_hex(self.uid() + sig)
108 self.generator.bld.cache_reqs += 1
110 files_to = [node.abspath() for node in self.outputs]
112 err = cache_command(proc, ssig, [], files_to)
113 process_pool.append(proc)
114 if err.startswith(OK):
115 if WAFCACHE_VERBOSITY:
116 Logs.pprint('CYAN', ' Fetched %r from cache' % files_to)
118 Logs.debug('wafcache: fetched %r from cache', files_to)
120 self.generator.bld.cache_hits += 1
122 if WAFCACHE_VERBOSITY:
123 Logs.pprint('YELLOW', ' No cache entry %s' % files_to)
125 Logs.debug('wafcache: No cache entry %s: %s', files_to, err)
131 def put_files_cache(self):
133 New method for waf Task classes
135 if WAFCACHE_NO_PUSH or getattr(self, 'cached', None) or not self.outputs:
139 for node in self.outputs:
140 path = node.abspath()
141 if not os.path.isfile(path):
143 files_from.append(path)
145 bld = self.generator.bld
146 old_sig = self.signature()
148 for node in self.inputs:
150 del node.ctx.cache_sig[node]
154 delattr(self, 'cache_sig')
155 sig = self.signature()
157 def _async_put_files_cache(bld, ssig, files_from):
159 if WAFCACHE_ASYNC_WORKERS:
160 with bld.wafcache_lock:
161 if bld.wafcache_stop:
162 process_pool.append(proc)
164 bld.wafcache_procs.add(proc)
166 err = cache_command(proc, ssig, files_from, [])
167 process_pool.append(proc)
168 if err.startswith(OK):
169 if WAFCACHE_VERBOSITY:
170 Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from)
172 Logs.debug('wafcache: Successfully uploaded %r to cache', files_from)
176 if WAFCACHE_VERBOSITY:
177 Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err))
179 Logs.debug('wafcache: Error caching results %s: %s', files_from, err)
182 ssig = Utils.to_hex(self.uid() + sig)
183 if WAFCACHE_ASYNC_WORKERS:
184 fut = bld.wafcache_executor.submit(_async_put_files_cache, bld, ssig, files_from)
185 bld.wafcache_uploads.append(fut)
187 _async_put_files_cache(bld, ssig, files_from)
189 Logs.debug('wafcache: skipped %r upload due to late input modifications %r', self.outputs, self.inputs)
191 bld.task_sigs[self.uid()] = self.cache_sig
193 def hash_env_vars(self, env, vars_lst):
195 Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths
202 idx = str(id(env)) + str(vars_lst)
204 cache = self.cache_env
205 except AttributeError:
206 cache = self.cache_env = {}
209 return self.cache_env[idx]
213 v = str([env[a] for a in vars_lst])
214 v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
219 Logs.debug('envhash: %r %r', ret, v)
227 Reimplement Task.uid() so that the signature does not depend on local paths
231 except AttributeError:
233 src = self.generator.bld.srcnode
235 up(self.__class__.__name__.encode())
236 for x in self.inputs + self.outputs:
237 up(x.path_from(src).encode())
238 self.uid_ = m.digest()
242 def make_cached(cls):
244 Enable the waf cache for a given task class
246 if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False):
249 full_name = "%s.%s" % (cls.__module__, cls.__name__)
250 if full_name in ('waflib.Tools.ccroot.vnum', 'waflib.Build.inst'):
253 m1 = getattr(cls, 'run', None)
255 if getattr(self, 'nocache', False):
257 if self.can_retrieve_cache():
262 m2 = getattr(cls, 'post_run', None)
264 if getattr(self, 'nocache', False):
267 self.put_files_cache()
269 cls.post_run = post_run
275 Returns a worker process that can process waf cache commands
276 The worker process is assumed to be returned to the process pool when unused
279 return process_pool.pop()
281 filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py'
282 cmd = [sys.executable, '-c', Utils.readf(filepath)]
283 return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0)
286 for proc in process_pool:
288 atexit.register(atexit_pool)
292 Called during the build process to enable file caching
295 if WAFCACHE_ASYNC_WORKERS:
297 num_workers = int(WAFCACHE_ASYNC_WORKERS)
299 Logs.warn('Invalid WAFCACHE_ASYNC_WORKERS specified: %r' % WAFCACHE_ASYNC_WORKERS)
301 from concurrent.futures import ThreadPoolExecutor
302 bld.wafcache_executor = ThreadPoolExecutor(max_workers=num_workers)
303 bld.wafcache_uploads = []
304 bld.wafcache_procs = set([])
305 bld.wafcache_stop = False
306 bld.wafcache_lock = threading.Lock()
308 def finalize_upload_async(bld):
309 if WAFCACHE_ASYNC_NOWAIT:
310 with bld.wafcache_lock:
311 bld.wafcache_stop = True
313 for fut in reversed(bld.wafcache_uploads):
316 for proc in bld.wafcache_procs:
319 bld.wafcache_procs.clear()
321 Logs.pprint('CYAN', '... waiting for wafcache uploads to complete (%s uploads)' % len(bld.wafcache_uploads))
322 bld.wafcache_executor.shutdown(wait=True)
323 bld.add_post_fun(finalize_upload_async)
326 # Init counter for statistics and hook to print results at the end
327 bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0
331 if bld.cache_reqs > 0:
332 hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100
333 Logs.pprint('CYAN', ' wafcache stats: %s requests, %s hits (ratio: %.2f%%), %s writes' %
334 (bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) )
335 bld.add_post_fun(printstats)
338 # already called once
342 processes = [get_process() for x in range(bld.jobs)]
343 process_pool.extend(processes)
345 Task.Task.can_retrieve_cache = can_retrieve_cache
346 Task.Task.put_files_cache = put_files_cache
348 Build.BuildContext.hash_env_vars = hash_env_vars
349 for x in reversed(list(Task.classes.values())):
352 def cache_command(proc, sig, files_from, files_to):
354 Create a command for cache worker processes, returns a pickled
355 base64-encoded tuple containing the task signature, a list of files to
356 cache and a list of files files to get from cache (one of the lists
357 is assumed to be empty)
359 obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to]))
360 proc.stdin.write(obj)
361 proc.stdin.write('\n'.encode())
363 obj = proc.stdout.readline()
365 raise OSError('Preforked sub-process %r died' % proc.pid)
366 return cPickle.loads(base64.b64decode(obj))
371 copyfun = shutil.copy2
373 def atomic_copy(orig, dest):
375 Copy files to the cache, the operation is atomic for a given file
379 up = os.path.dirname(dest)
388 if e.errno == errno.EXDEV:
389 copyfun = shutil.copy2
397 the cache folders take the form:
398 `CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9`
399 they are listed in order of last access, and then removed
400 until the amount of folders is within TRIM_MAX_FOLDERS and the total space
401 taken by files is less than EVICT_MAX_BYTES
404 for up in os.listdir(CACHE_DIR):
406 sub = os.path.join(CACHE_DIR, up)
407 for hval in os.listdir(sub):
408 path = os.path.join(sub, hval)
411 for fname in os.listdir(path):
413 size += os.lstat(os.path.join(path, fname)).st_size
416 lst.append((os.stat(path).st_mtime, size, path))
418 lst.sort(key=lambda x: x[0])
421 tot = sum(x[1] for x in lst)
422 while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS:
423 _, tmp_size, path = lst.pop()
426 tmp = path + '.remove'
434 sys.stderr.write('Could not rename %r to %r\n' % (path, tmp))
439 sys.stderr.write('Could not remove %r\n' % tmp)
440 sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst)))
445 Reduce the cache size
447 lockfile = os.path.join(CACHE_DIR, 'all.lock')
449 st = os.stat(lockfile)
450 except EnvironmentError as e:
451 if e.errno == errno.ENOENT:
452 with open(lockfile, 'w') as f:
458 if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60:
459 # check every EVICT_INTERVAL_MINUTES minutes if the cache is too big
460 # OCLOEXEC is unnecessary because no processes are spawned
461 fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755)
464 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
465 except EnvironmentError:
466 if WAFCACHE_VERBOSITY:
467 sys.stderr.write('wafcache: another cleaning process is running\n')
469 # now dow the actual cleanup
471 os.utime(lockfile, None)
475 class netcache(object):
477 self.http = urllib3.PoolManager()
479 def url_of(self, sig, i):
480 return "%s/%s/%s" % (CACHE_DIR, sig, i)
482 def upload(self, file_path, sig, i):
483 url = self.url_of(sig, i)
484 with open(file_path, 'rb') as f:
486 r = self.http.request('POST', url, timeout=60,
487 fields={ 'file': ('%s/%s' % (sig, i), file_data), })
489 raise OSError("Invalid status %r %r" % (url, r.status))
491 def download(self, file_path, sig, i):
492 url = self.url_of(sig, i)
493 with self.http.request('GET', url, preload_content=False, timeout=60) as inf:
494 if inf.status >= 400:
495 raise OSError("Invalid status %r %r" % (url, inf.status))
496 with open(file_path, 'wb') as out:
497 shutil.copyfileobj(inf, out)
499 def copy_to_cache(self, sig, files_from, files_to):
501 for i, x in enumerate(files_from):
502 if not os.path.islink(x):
503 self.upload(x, sig, i)
505 return traceback.format_exc()
508 def copy_from_cache(self, sig, files_from, files_to):
510 for i, x in enumerate(files_to):
511 self.download(x, sig, i)
513 return traceback.format_exc()
516 class fcache(object):
518 if not os.path.exists(CACHE_DIR):
520 os.makedirs(CACHE_DIR)
523 if not os.path.exists(CACHE_DIR):
524 raise ValueError('Could not initialize the cache directory')
526 def copy_to_cache(self, sig, files_from, files_to):
528 Copy files to the cache, existing files are overwritten,
529 and the copy is atomic only for a given file, not for all files
530 that belong to a given task object
533 for i, x in enumerate(files_from):
534 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
537 return traceback.format_exc()
539 # attempt trimming if caching was successful:
540 # we may have things to trim!
544 return traceback.format_exc()
547 def copy_from_cache(self, sig, files_from, files_to):
549 Copy files from the cache
552 for i, x in enumerate(files_to):
553 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
556 # success! update the cache time
557 os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
559 return traceback.format_exc()
562 class bucket_cache(object):
563 def bucket_copy(self, source, target):
566 if match.group('src'):
568 elif match.group('tgt'):
570 cmd = [re_waf_cmd.sub(replacer, x) for x in shlex.split(WAFCACHE_CMD)]
571 elif CACHE_DIR.startswith('s3://'):
572 cmd = ['aws', 's3', 'cp', source, target]
573 elif CACHE_DIR.startswith('gs://'):
574 cmd = ['gsutil', 'cp', source, target]
576 cmd = ['mc', 'cp', source, target]
578 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
579 out, err = proc.communicate()
581 raise OSError('Error copy %r to %r using: %r (exit %r):\n out:%s\n err:%s' % (
582 source, target, cmd, proc.returncode, out.decode(errors='replace'), err.decode(errors='replace')))
584 def copy_to_cache(self, sig, files_from, files_to):
586 for i, x in enumerate(files_from):
587 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
588 self.bucket_copy(x, dest)
590 return traceback.format_exc()
593 def copy_from_cache(self, sig, files_from, files_to):
595 for i, x in enumerate(files_to):
596 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
597 self.bucket_copy(orig, x)
598 except EnvironmentError:
599 return traceback.format_exc()
604 This function is run when this file is run as a standalone python script,
605 it assumes a parent process that will communicate the commands to it
606 as pickled-encoded tuples (one line per command)
608 The commands are to copy files to the cache or copy files from the
609 cache to a target destination
611 # one operation is performed at a single time by a single process
612 # therefore stdin never has more than one line
613 txt = sys.stdin.readline().strip()
615 # parent process probably ended
619 [sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt))
621 # TODO return early when pushing files upstream
622 ret = service.copy_to_cache(sig, files_from, files_to)
624 # the build process waits for workers to (possibly) obtain files from the cache
625 ret = service.copy_from_cache(sig, files_from, files_to)
627 ret = "Invalid command"
629 obj = base64.b64encode(cPickle.dumps(ret))
630 sys.stdout.write(obj.decode())
631 sys.stdout.write('\n')
634 if __name__ == '__main__':
635 if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://') or CACHE_DIR.startswith('minio://'):
636 if CACHE_DIR.startswith('minio://'):
637 CACHE_DIR = CACHE_DIR[8:] # minio doesn't need the protocol part, uses config aliases
638 service = bucket_cache()
639 elif CACHE_DIR.startswith('http'):
646 except KeyboardInterrupt: