third_party: Update waf to version 2.0.22
[bbaumbach/samba-autobuild/.git] / third_party / waf / waflib / extras / wafcache.py
1 #! /usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2019 (ita)
4
5 """
6 Filesystem-based cache system to share and re-use build artifacts
7
8 Cache access operations (copy to and from) are delegated to
9 independent pre-forked worker subprocesses.
10
11 The following environment variables may be set:
12 * WAFCACHE: several possibilities:
13   - File cache:
14     absolute path of the waf cache (~/.cache/wafcache_user,
15     where `user` represents the currently logged-in user)
16   - URL to a cache server, for example:
17     export WAFCACHE=http://localhost:8080/files/
18     in that case, GET/POST requests are made to urls of the form
19     http://localhost:8080/files/000000000/0 (cache management is delegated to the server)
20   - GCS, S3 or MINIO bucket
21     gs://my-bucket/    (uses gsutil command line tool or WAFCACHE_CMD)
22     s3://my-bucket/    (uses aws command line tool or WAFCACHE_CMD)
23     minio://my-bucket/ (uses mc command line tool or WAFCACHE_CMD)
24 * WAFCACHE_CMD: bucket upload/download command, for example:
25     WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}"
26   Note that the WAFCACHE bucket value is used for the source or destination
27   depending on the operation (upload or download). For example, with:
28     WAFCACHE="gs://mybucket/"
29   the following commands may be run:
30     gsutil cp build/myprogram  gs://mybucket/aa/aaaaa/1
31     gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile
32 * WAFCACHE_NO_PUSH: if set, disables pushing to the cache
33 * WAFCACHE_VERBOSITY: if set, displays more detailed cache operations
34 * WAFCACHE_STATS: if set, displays cache usage statistics on exit
35
36 File cache specific options:
37   Files are copied using hard links by default; if the cache is located
38   onto another partition, the system switches to file copies instead.
39 * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
40 * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
41 * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
42                                    and trim the cache (3 minutess)
43
44 Usage::
45
46         def build(bld):
47                 bld.load('wafcache')
48                 ...
49
50 To troubleshoot::
51
52         waf clean build --zones=wafcache
53 """
54
55 import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, traceback, urllib3, shlex
56 try:
57         import subprocess32 as subprocess
58 except ImportError:
59         import subprocess
60
61 base_cache = os.path.expanduser('~/.cache/')
62 if not os.path.isdir(base_cache):
63         base_cache = '/tmp/'
64 default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser())
65
66 CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir)
67 WAFCACHE_CMD = os.environ.get('WAFCACHE_CMD')
68 TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
69 EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
70 EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
71 WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0
72 WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0
73 WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0
74 OK = "ok"
75
76 re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})')
77
78 try:
79         import cPickle
80 except ImportError:
81         import pickle as cPickle
82
83 if __name__ != '__main__':
84         from waflib import Task, Logs, Utils, Build
85
86 def can_retrieve_cache(self):
87         """
88         New method for waf Task classes
89         """
90         if not self.outputs:
91                 return False
92
93         self.cached = False
94
95         sig = self.signature()
96         ssig = Utils.to_hex(self.uid() + sig)
97
98         if WAFCACHE_STATS:
99                 self.generator.bld.cache_reqs += 1
100
101         files_to = [node.abspath() for node in self.outputs]
102         err = cache_command(ssig, [], files_to)
103         if err.startswith(OK):
104                 if WAFCACHE_VERBOSITY:
105                         Logs.pprint('CYAN', '  Fetched %r from cache' % files_to)
106                 else:
107                         Logs.debug('wafcache: fetched %r from cache', files_to)
108                 if WAFCACHE_STATS:
109                         self.generator.bld.cache_hits += 1
110         else:
111                 if WAFCACHE_VERBOSITY:
112                         Logs.pprint('YELLOW', '  No cache entry %s' % files_to)
113                 else:
114                         Logs.debug('wafcache: No cache entry %s: %s', files_to, err)
115                 return False
116
117         self.cached = True
118         return True
119
120 def put_files_cache(self):
121         """
122         New method for waf Task classes
123         """
124         if WAFCACHE_NO_PUSH or getattr(self, 'cached', None) or not self.outputs:
125                 return
126
127         files_from = []
128         for node in self.outputs:
129                 path = node.abspath()
130                 if not os.path.isfile(path):
131                         return
132                 files_from.append(path)
133
134         bld = self.generator.bld
135         sig = self.signature()
136         ssig = Utils.to_hex(self.uid() + sig)
137
138         err = cache_command(ssig, files_from, [])
139
140         if err.startswith(OK):
141                 if WAFCACHE_VERBOSITY:
142                         Logs.pprint('CYAN', '  Successfully uploaded %s to cache' % files_from)
143                 else:
144                         Logs.debug('wafcache: Successfully uploaded %r to cache', files_from)
145                 if WAFCACHE_STATS:
146                         self.generator.bld.cache_puts += 1
147         else:
148                 if WAFCACHE_VERBOSITY:
149                         Logs.pprint('RED', '  Error caching step results %s: %s' % (files_from, err))
150                 else:
151                         Logs.debug('wafcache: Error caching results %s: %s', files_from, err)
152
153         bld.task_sigs[self.uid()] = self.cache_sig
154
155 def hash_env_vars(self, env, vars_lst):
156         """
157         Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths
158         """
159         if not env.table:
160                 env = env.parent
161                 if not env:
162                         return Utils.SIG_NIL
163
164         idx = str(id(env)) + str(vars_lst)
165         try:
166                 cache = self.cache_env
167         except AttributeError:
168                 cache = self.cache_env = {}
169         else:
170                 try:
171                         return self.cache_env[idx]
172                 except KeyError:
173                         pass
174
175         v = str([env[a] for a in vars_lst])
176         v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
177         m = Utils.md5()
178         m.update(v.encode())
179         ret = m.digest()
180
181         Logs.debug('envhash: %r %r', ret, v)
182
183         cache[idx] = ret
184
185         return ret
186
187 def uid(self):
188         """
189         Reimplement Task.uid() so that the signature does not depend on local paths
190         """
191         try:
192                 return self.uid_
193         except AttributeError:
194                 m = Utils.md5()
195                 src = self.generator.bld.srcnode
196                 up = m.update
197                 up(self.__class__.__name__.encode())
198                 for x in self.inputs + self.outputs:
199                         up(x.path_from(src).encode())
200                 self.uid_ = m.digest()
201                 return self.uid_
202
203
204 def make_cached(cls):
205         """
206         Enable the waf cache for a given task class
207         """
208         if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False):
209                 return
210
211         full_name = "%s.%s" % (cls.__module__, cls.__name__)
212         if full_name in ('waflib.Tools.ccroot.vnum', 'waflib.Build.inst'):
213                 return
214
215         m1 = getattr(cls, 'run', None)
216         def run(self):
217                 if getattr(self, 'nocache', False):
218                         return m1(self)
219                 if self.can_retrieve_cache():
220                         return 0
221                 return m1(self)
222         cls.run = run
223
224         m2 = getattr(cls, 'post_run', None)
225         def post_run(self):
226                 if getattr(self, 'nocache', False):
227                         return m2(self)
228                 ret = m2(self)
229                 self.put_files_cache()
230                 return ret
231         cls.post_run = post_run
232         cls.has_cache = True
233
234 process_pool = []
235 def get_process():
236         """
237         Returns a worker process that can process waf cache commands
238         The worker process is assumed to be returned to the process pool when unused
239         """
240         try:
241                 return process_pool.pop()
242         except IndexError:
243                 filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py'
244                 cmd = [sys.executable, '-c', Utils.readf(filepath)]
245                 return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0)
246
247 def atexit_pool():
248         for k in process_pool:
249                 try:
250                         os.kill(k.pid, 9)
251                 except OSError:
252                         pass
253                 else:
254                         k.wait()
255 atexit.register(atexit_pool)
256
257 def build(bld):
258         """
259         Called during the build process to enable file caching
260         """
261         if process_pool:
262                 # already called once
263                 return
264
265         # pre-allocation
266         processes = [get_process() for x in range(bld.jobs)]
267         process_pool.extend(processes)
268
269         Task.Task.can_retrieve_cache = can_retrieve_cache
270         Task.Task.put_files_cache = put_files_cache
271         Task.Task.uid = uid
272         Build.BuildContext.hash_env_vars = hash_env_vars
273         for x in reversed(list(Task.classes.values())):
274                 make_cached(x)
275
276         if WAFCACHE_STATS:
277                 # Init counter for statistics and hook to print results at the end
278                 bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0
279
280                 def printstats(bld):
281                         hit_ratio = 0
282                         if bld.cache_reqs > 0:
283                                 hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100
284                         Logs.pprint('CYAN', '  wafcache stats: requests: %s, hits, %s, ratio: %.2f%%, writes %s' %
285                                          (bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) )
286
287                 bld.add_post_fun(printstats)
288
289 def cache_command(sig, files_from, files_to):
290         """
291         Create a command for cache worker processes, returns a pickled
292         base64-encoded tuple containing the task signature, a list of files to
293         cache and a list of files files to get from cache (one of the lists
294         is assumed to be empty)
295         """
296         proc = get_process()
297
298         obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to]))
299         proc.stdin.write(obj)
300         proc.stdin.write('\n'.encode())
301         proc.stdin.flush()
302         obj = proc.stdout.readline()
303         if not obj:
304                 raise OSError('Preforked sub-process %r died' % proc.pid)
305         process_pool.append(proc)
306         return cPickle.loads(base64.b64decode(obj))
307
308 try:
309         copyfun = os.link
310 except NameError:
311         copyfun = shutil.copy2
312
313 def atomic_copy(orig, dest):
314         """
315         Copy files to the cache, the operation is atomic for a given file
316         """
317         global copyfun
318         tmp = dest + '.tmp'
319         up = os.path.dirname(dest)
320         try:
321                 os.makedirs(up)
322         except OSError:
323                 pass
324
325         try:
326                 copyfun(orig, tmp)
327         except OSError as e:
328                 if e.errno == errno.EXDEV:
329                         copyfun = shutil.copy2
330                         copyfun(orig, tmp)
331                 else:
332                         raise
333         os.rename(tmp, dest)
334
335 def lru_trim():
336         """
337         the cache folders take the form:
338         `CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9`
339         they are listed in order of last access, and then removed
340         until the amount of folders is within TRIM_MAX_FOLDERS and the total space
341         taken by files is less than EVICT_MAX_BYTES
342         """
343         lst = []
344         for up in os.listdir(CACHE_DIR):
345                 if len(up) == 2:
346                         sub = os.path.join(CACHE_DIR, up)
347                         for hval in os.listdir(sub):
348                                 path = os.path.join(sub, hval)
349
350                                 size = 0
351                                 for fname in os.listdir(path):
352                                         try:
353                                                 size += os.lstat(os.path.join(path, fname)).st_size
354                                         except OSError:
355                                                 pass
356                                 lst.append((os.stat(path).st_mtime, size, path))
357
358         lst.sort(key=lambda x: x[0])
359         lst.reverse()
360
361         tot = sum(x[1] for x in lst)
362         while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS:
363                 _, tmp_size, path = lst.pop()
364                 tot -= tmp_size
365
366                 tmp = path + '.remove'
367                 try:
368                         shutil.rmtree(tmp)
369                 except OSError:
370                         pass
371                 try:
372                         os.rename(path, tmp)
373                 except OSError:
374                         sys.stderr.write('Could not rename %r to %r\n' % (path, tmp))
375                 else:
376                         try:
377                                 shutil.rmtree(tmp)
378                         except OSError:
379                                 sys.stderr.write('Could not remove %r\n' % tmp)
380         sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst)))
381
382
383 def lru_evict():
384         """
385         Reduce the cache size
386         """
387         lockfile = os.path.join(CACHE_DIR, 'all.lock')
388         try:
389                 st = os.stat(lockfile)
390         except EnvironmentError as e:
391                 if e.errno == errno.ENOENT:
392                         with open(lockfile, 'w') as f:
393                                 f.write('')
394                         return
395                 else:
396                         raise
397
398         if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60:
399                 # check every EVICT_INTERVAL_MINUTES minutes if the cache is too big
400                 # OCLOEXEC is unnecessary because no processes are spawned
401                 fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755)
402                 try:
403                         try:
404                                 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
405                         except EnvironmentError:
406                                 if WAFCACHE_VERBOSITY:
407                                         sys.stderr.write('wafcache: another cleaning process is running\n')
408                         else:
409                                 # now dow the actual cleanup
410                                 lru_trim()
411                                 os.utime(lockfile, None)
412                 finally:
413                         os.close(fd)
414
415 class netcache(object):
416         def __init__(self):
417                 self.http = urllib3.PoolManager()
418
419         def url_of(self, sig, i):
420                 return "%s/%s/%s" % (CACHE_DIR, sig, i)
421
422         def upload(self, file_path, sig, i):
423                 url = self.url_of(sig, i)
424                 with open(file_path, 'rb') as f:
425                         file_data = f.read()
426                 r = self.http.request('POST', url, timeout=60,
427                         fields={ 'file': ('%s/%s' % (sig, i), file_data), })
428                 if r.status >= 400:
429                         raise OSError("Invalid status %r %r" % (url, r.status))
430
431         def download(self, file_path, sig, i):
432                 url = self.url_of(sig, i)
433                 with self.http.request('GET', url, preload_content=False, timeout=60) as inf:
434                         if inf.status >= 400:
435                                 raise OSError("Invalid status %r %r" % (url, inf.status))
436                         with open(file_path, 'wb') as out:
437                                 shutil.copyfileobj(inf, out)
438
439         def copy_to_cache(self, sig, files_from, files_to):
440                 try:
441                         for i, x in enumerate(files_from):
442                                 if not os.path.islink(x):
443                                         self.upload(x, sig, i)
444                 except Exception:
445                         return traceback.format_exc()
446                 return OK
447
448         def copy_from_cache(self, sig, files_from, files_to):
449                 try:
450                         for i, x in enumerate(files_to):
451                                 self.download(x, sig, i)
452                 except Exception:
453                         return traceback.format_exc()
454                 return OK
455
456 class fcache(object):
457         def __init__(self):
458                 if not os.path.exists(CACHE_DIR):
459                         os.makedirs(CACHE_DIR)
460                 if not os.path.exists(CACHE_DIR):
461                         raise ValueError('Could not initialize the cache directory')
462
463         def copy_to_cache(self, sig, files_from, files_to):
464                 """
465                 Copy files to the cache, existing files are overwritten,
466                 and the copy is atomic only for a given file, not for all files
467                 that belong to a given task object
468                 """
469                 try:
470                         for i, x in enumerate(files_from):
471                                 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
472                                 atomic_copy(x, dest)
473                 except Exception:
474                         return traceback.format_exc()
475                 else:
476                         # attempt trimming if caching was successful:
477                         # we may have things to trim!
478                         try:
479                                 lru_evict()
480                         except Exception:
481                                 return traceback.format_exc()
482                 return OK
483
484         def copy_from_cache(self, sig, files_from, files_to):
485                 """
486                 Copy files from the cache
487                 """
488                 try:
489                         for i, x in enumerate(files_to):
490                                 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
491                                 atomic_copy(orig, x)
492
493                         # success! update the cache time
494                         os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
495                 except Exception:
496                         return traceback.format_exc()
497                 return OK
498
499 class bucket_cache(object):
500         def bucket_copy(self, source, target):
501                 if WAFCACHE_CMD:
502                         def replacer(match):
503                                 if match.group('src'):
504                                         return source
505                                 elif match.group('tgt'):
506                                         return target
507                         cmd = [re_waf_cmd.sub(replacer, x) for x in shlex.split(WAFCACHE_CMD)]
508                 elif CACHE_DIR.startswith('s3://'):
509                         cmd = ['aws', 's3', 'cp', source, target]
510                 elif CACHE_DIR.startswith('gs://'):
511                         cmd = ['gsutil', 'cp', source, target]
512                 else:
513                         cmd = ['mc', 'cp', source, target]
514
515                 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
516                 out, err = proc.communicate()
517                 if proc.returncode:
518                         raise OSError('Error copy %r to %r using: %r (exit %r):\n  out:%s\n  err:%s' % (
519                                 source, target, cmd, proc.returncode, out.decode(errors='replace'), err.decode(errors='replace')))
520
521         def copy_to_cache(self, sig, files_from, files_to):
522                 try:
523                         for i, x in enumerate(files_from):
524                                 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
525                                 self.bucket_copy(x, dest)
526                 except Exception:
527                         return traceback.format_exc()
528                 return OK
529
530         def copy_from_cache(self, sig, files_from, files_to):
531                 try:
532                         for i, x in enumerate(files_to):
533                                 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
534                                 self.bucket_copy(orig, x)
535                 except EnvironmentError:
536                         return traceback.format_exc()
537                 return OK
538
539 def loop(service):
540         """
541         This function is run when this file is run as a standalone python script,
542         it assumes a parent process that will communicate the commands to it
543         as pickled-encoded tuples (one line per command)
544
545         The commands are to copy files to the cache or copy files from the
546         cache to a target destination
547         """
548         # one operation is performed at a single time by a single process
549         # therefore stdin never has more than one line
550         txt = sys.stdin.readline().strip()
551         if not txt:
552                 # parent process probably ended
553                 sys.exit(1)
554         ret = OK
555
556         [sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt))
557         if files_from:
558                 # TODO return early when pushing files upstream
559                 ret = service.copy_to_cache(sig, files_from, files_to)
560         elif files_to:
561                 # the build process waits for workers to (possibly) obtain files from the cache
562                 ret = service.copy_from_cache(sig, files_from, files_to)
563         else:
564                 ret = "Invalid command"
565
566         obj = base64.b64encode(cPickle.dumps(ret))
567         sys.stdout.write(obj.decode())
568         sys.stdout.write('\n')
569         sys.stdout.flush()
570
571 if __name__ == '__main__':
572         if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://') or CACHE_DIR.startswith('minio://'):
573                 if CACHE_DIR.startswith('minio://'):
574                         CACHE_DIR = CACHE_DIR[8:]   # minio doesn't need the protocol part, uses config aliases
575                 service = bucket_cache()
576         elif CACHE_DIR.startswith('http'):
577                 service = netcache()
578         else:
579                 service = fcache()
580         while 1:
581                 try:
582                         loop(service)
583                 except KeyboardInterrupt:
584                         break
585