third_party: Update waf to version 2.0.25
[bbaumbach/samba-autobuild/.git] / third_party / waf / waflib / extras / wafcache.py
1 #! /usr/bin/env python
2 # encoding: utf-8
3 # Thomas Nagy, 2019 (ita)
4
5 """
6 Filesystem-based cache system to share and re-use build artifacts
7
8 Cache access operations (copy to and from) are delegated to
9 independent pre-forked worker subprocesses.
10
11 The following environment variables may be set:
12 * WAFCACHE: several possibilities:
13   - File cache:
14     absolute path of the waf cache (~/.cache/wafcache_user,
15     where `user` represents the currently logged-in user)
16   - URL to a cache server, for example:
17     export WAFCACHE=http://localhost:8080/files/
18     in that case, GET/POST requests are made to urls of the form
19     http://localhost:8080/files/000000000/0 (cache management is delegated to the server)
20   - GCS, S3 or MINIO bucket
21     gs://my-bucket/    (uses gsutil command line tool or WAFCACHE_CMD)
22     s3://my-bucket/    (uses aws command line tool or WAFCACHE_CMD)
23     minio://my-bucket/ (uses mc command line tool or WAFCACHE_CMD)
24 * WAFCACHE_CMD: bucket upload/download command, for example:
25     WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}"
26   Note that the WAFCACHE bucket value is used for the source or destination
27   depending on the operation (upload or download). For example, with:
28     WAFCACHE="gs://mybucket/"
29   the following commands may be run:
30     gsutil cp build/myprogram  gs://mybucket/aa/aaaaa/1
31     gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile
32 * WAFCACHE_NO_PUSH: if set, disables pushing to the cache
33 * WAFCACHE_VERBOSITY: if set, displays more detailed cache operations
34 * WAFCACHE_STATS: if set, displays cache usage statistics on exit
35
36 File cache specific options:
37   Files are copied using hard links by default; if the cache is located
38   onto another partition, the system switches to file copies instead.
39 * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
40 * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
41 * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
42                                    and trim the cache (3 minutes)
43
44 Upload specific options:
45 * WAFCACHE_ASYNC_WORKERS: define a number of workers to upload results asynchronously
46                           this may improve build performance with many/long file uploads
47                           the default is unset (synchronous uploads)
48 * WAFCACHE_ASYNC_NOWAIT: do not wait for uploads to complete (default: False)
49                          this requires asynchonous uploads to have an effect
50
51 Usage::
52
53         def build(bld):
54                 bld.load('wafcache')
55                 ...
56
57 To troubleshoot::
58
59         waf clean build --zone=wafcache
60 """
61
62 import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, threading, traceback, urllib3, shlex
63 try:
64         import subprocess32 as subprocess
65 except ImportError:
66         import subprocess
67
68 base_cache = os.path.expanduser('~/.cache/')
69 if not os.path.isdir(base_cache):
70         base_cache = '/tmp/'
71 default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser())
72
73 CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir)
74 WAFCACHE_CMD = os.environ.get('WAFCACHE_CMD')
75 TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
76 EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
77 EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
78 WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0
79 WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0
80 WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0
81 WAFCACHE_ASYNC_WORKERS = os.environ.get('WAFCACHE_ASYNC_WORKERS')
82 WAFCACHE_ASYNC_NOWAIT = os.environ.get('WAFCACHE_ASYNC_NOWAIT')
83 OK = "ok"
84
85 re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})')
86
87 try:
88         import cPickle
89 except ImportError:
90         import pickle as cPickle
91
92 if __name__ != '__main__':
93         from waflib import Task, Logs, Utils, Build
94
95 def can_retrieve_cache(self):
96         """
97         New method for waf Task classes
98         """
99         if not self.outputs:
100                 return False
101
102         self.cached = False
103
104         sig = self.signature()
105         ssig = Utils.to_hex(self.uid() + sig)
106
107         if WAFCACHE_STATS:
108                 self.generator.bld.cache_reqs += 1
109
110         files_to = [node.abspath() for node in self.outputs]
111         proc = get_process()
112         err = cache_command(proc, ssig, [], files_to)
113         process_pool.append(proc)
114         if err.startswith(OK):
115                 if WAFCACHE_VERBOSITY:
116                         Logs.pprint('CYAN', '  Fetched %r from cache' % files_to)
117                 else:
118                         Logs.debug('wafcache: fetched %r from cache', files_to)
119                 if WAFCACHE_STATS:
120                         self.generator.bld.cache_hits += 1
121         else:
122                 if WAFCACHE_VERBOSITY:
123                         Logs.pprint('YELLOW', '  No cache entry %s' % files_to)
124                 else:
125                         Logs.debug('wafcache: No cache entry %s: %s', files_to, err)
126                 return False
127
128         self.cached = True
129         return True
130
131 def put_files_cache(self):
132         """
133         New method for waf Task classes
134         """
135         if WAFCACHE_NO_PUSH or getattr(self, 'cached', None) or not self.outputs:
136                 return
137
138         files_from = []
139         for node in self.outputs:
140                 path = node.abspath()
141                 if not os.path.isfile(path):
142                         return
143                 files_from.append(path)
144
145         bld = self.generator.bld
146         old_sig = self.signature()
147
148         for node in self.inputs:
149                 try:
150                         del node.ctx.cache_sig[node]
151                 except KeyError:
152                         pass
153
154         delattr(self, 'cache_sig')
155         sig = self.signature()
156
157         def _async_put_files_cache(bld, ssig, files_from):
158                 proc = get_process()
159                 if WAFCACHE_ASYNC_WORKERS:
160                         with bld.wafcache_lock:
161                                 if bld.wafcache_stop:
162                                         process_pool.append(proc)
163                                         return
164                                 bld.wafcache_procs.add(proc)
165
166                 err = cache_command(proc, ssig, files_from, [])
167                 process_pool.append(proc)
168                 if err.startswith(OK):
169                         if WAFCACHE_VERBOSITY:
170                                 Logs.pprint('CYAN', '  Successfully uploaded %s to cache' % files_from)
171                         else:
172                                 Logs.debug('wafcache: Successfully uploaded %r to cache', files_from)
173                         if WAFCACHE_STATS:
174                                 bld.cache_puts += 1
175                 else:
176                         if WAFCACHE_VERBOSITY:
177                                 Logs.pprint('RED', '  Error caching step results %s: %s' % (files_from, err))
178                         else:
179                                 Logs.debug('wafcache: Error caching results %s: %s', files_from, err)
180
181         if old_sig == sig:
182                 ssig = Utils.to_hex(self.uid() + sig)
183                 if WAFCACHE_ASYNC_WORKERS:
184                         fut = bld.wafcache_executor.submit(_async_put_files_cache, bld, ssig, files_from)
185                         bld.wafcache_uploads.append(fut)
186                 else:
187                         _async_put_files_cache(bld, ssig, files_from)
188         else:
189                 Logs.debug('wafcache: skipped %r upload due to late input modifications %r', self.outputs, self.inputs)
190
191         bld.task_sigs[self.uid()] = self.cache_sig
192
193 def hash_env_vars(self, env, vars_lst):
194         """
195         Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths
196         """
197         if not env.table:
198                 env = env.parent
199                 if not env:
200                         return Utils.SIG_NIL
201
202         idx = str(id(env)) + str(vars_lst)
203         try:
204                 cache = self.cache_env
205         except AttributeError:
206                 cache = self.cache_env = {}
207         else:
208                 try:
209                         return self.cache_env[idx]
210                 except KeyError:
211                         pass
212
213         v = str([env[a] for a in vars_lst])
214         v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
215         m = Utils.md5()
216         m.update(v.encode())
217         ret = m.digest()
218
219         Logs.debug('envhash: %r %r', ret, v)
220
221         cache[idx] = ret
222
223         return ret
224
225 def uid(self):
226         """
227         Reimplement Task.uid() so that the signature does not depend on local paths
228         """
229         try:
230                 return self.uid_
231         except AttributeError:
232                 m = Utils.md5()
233                 src = self.generator.bld.srcnode
234                 up = m.update
235                 up(self.__class__.__name__.encode())
236                 for x in self.inputs + self.outputs:
237                         up(x.path_from(src).encode())
238                 self.uid_ = m.digest()
239                 return self.uid_
240
241
242 def make_cached(cls):
243         """
244         Enable the waf cache for a given task class
245         """
246         if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False):
247                 return
248
249         full_name = "%s.%s" % (cls.__module__, cls.__name__)
250         if full_name in ('waflib.Tools.ccroot.vnum', 'waflib.Build.inst'):
251                 return
252
253         m1 = getattr(cls, 'run', None)
254         def run(self):
255                 if getattr(self, 'nocache', False):
256                         return m1(self)
257                 if self.can_retrieve_cache():
258                         return 0
259                 return m1(self)
260         cls.run = run
261
262         m2 = getattr(cls, 'post_run', None)
263         def post_run(self):
264                 if getattr(self, 'nocache', False):
265                         return m2(self)
266                 ret = m2(self)
267                 self.put_files_cache()
268                 return ret
269         cls.post_run = post_run
270         cls.has_cache = True
271
272 process_pool = []
273 def get_process():
274         """
275         Returns a worker process that can process waf cache commands
276         The worker process is assumed to be returned to the process pool when unused
277         """
278         try:
279                 return process_pool.pop()
280         except IndexError:
281                 filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py'
282                 cmd = [sys.executable, '-c', Utils.readf(filepath)]
283                 return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0)
284
285 def atexit_pool():
286         for proc in process_pool:
287                 proc.kill()
288 atexit.register(atexit_pool)
289
290 def build(bld):
291         """
292         Called during the build process to enable file caching
293         """
294
295         if WAFCACHE_ASYNC_WORKERS:
296                 try:
297                         num_workers = int(WAFCACHE_ASYNC_WORKERS)
298                 except ValueError:
299                         Logs.warn('Invalid WAFCACHE_ASYNC_WORKERS specified: %r' % WAFCACHE_ASYNC_WORKERS)
300                 else:
301                         from concurrent.futures import ThreadPoolExecutor
302                         bld.wafcache_executor = ThreadPoolExecutor(max_workers=num_workers)
303                         bld.wafcache_uploads = []
304                         bld.wafcache_procs = set([])
305                         bld.wafcache_stop = False
306                         bld.wafcache_lock = threading.Lock()
307
308                 def finalize_upload_async(bld):
309                         if WAFCACHE_ASYNC_NOWAIT:
310                                 with bld.wafcache_lock:
311                                         bld.wafcache_stop = True
312
313                                 for fut in reversed(bld.wafcache_uploads):
314                                         fut.cancel()
315
316                                 for proc in bld.wafcache_procs:
317                                         proc.kill()
318
319                                 bld.wafcache_procs.clear()
320                         else:
321                                 Logs.pprint('CYAN', '... waiting for wafcache uploads to complete (%s uploads)' % len(bld.wafcache_uploads))
322                         bld.wafcache_executor.shutdown(wait=True)
323                 bld.add_post_fun(finalize_upload_async)
324
325         if WAFCACHE_STATS:
326                 # Init counter for statistics and hook to print results at the end
327                 bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0
328
329                 def printstats(bld):
330                         hit_ratio = 0
331                         if bld.cache_reqs > 0:
332                                 hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100
333                         Logs.pprint('CYAN', '  wafcache stats: %s requests, %s hits (ratio: %.2f%%), %s writes' %
334                                          (bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) )
335                 bld.add_post_fun(printstats)
336
337         if process_pool:
338                 # already called once
339                 return
340
341         # pre-allocation
342         processes = [get_process() for x in range(bld.jobs)]
343         process_pool.extend(processes)
344
345         Task.Task.can_retrieve_cache = can_retrieve_cache
346         Task.Task.put_files_cache = put_files_cache
347         Task.Task.uid = uid
348         Build.BuildContext.hash_env_vars = hash_env_vars
349         for x in reversed(list(Task.classes.values())):
350                 make_cached(x)
351
352 def cache_command(proc, sig, files_from, files_to):
353         """
354         Create a command for cache worker processes, returns a pickled
355         base64-encoded tuple containing the task signature, a list of files to
356         cache and a list of files files to get from cache (one of the lists
357         is assumed to be empty)
358         """
359         obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to]))
360         proc.stdin.write(obj)
361         proc.stdin.write('\n'.encode())
362         proc.stdin.flush()
363         obj = proc.stdout.readline()
364         if not obj:
365                 raise OSError('Preforked sub-process %r died' % proc.pid)
366         return cPickle.loads(base64.b64decode(obj))
367
368 try:
369         copyfun = os.link
370 except NameError:
371         copyfun = shutil.copy2
372
373 def atomic_copy(orig, dest):
374         """
375         Copy files to the cache, the operation is atomic for a given file
376         """
377         global copyfun
378         tmp = dest + '.tmp'
379         up = os.path.dirname(dest)
380         try:
381                 os.makedirs(up)
382         except OSError:
383                 pass
384
385         try:
386                 copyfun(orig, tmp)
387         except OSError as e:
388                 if e.errno == errno.EXDEV:
389                         copyfun = shutil.copy2
390                         copyfun(orig, tmp)
391                 else:
392                         raise
393         os.rename(tmp, dest)
394
395 def lru_trim():
396         """
397         the cache folders take the form:
398         `CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9`
399         they are listed in order of last access, and then removed
400         until the amount of folders is within TRIM_MAX_FOLDERS and the total space
401         taken by files is less than EVICT_MAX_BYTES
402         """
403         lst = []
404         for up in os.listdir(CACHE_DIR):
405                 if len(up) == 2:
406                         sub = os.path.join(CACHE_DIR, up)
407                         for hval in os.listdir(sub):
408                                 path = os.path.join(sub, hval)
409
410                                 size = 0
411                                 for fname in os.listdir(path):
412                                         try:
413                                                 size += os.lstat(os.path.join(path, fname)).st_size
414                                         except OSError:
415                                                 pass
416                                 lst.append((os.stat(path).st_mtime, size, path))
417
418         lst.sort(key=lambda x: x[0])
419         lst.reverse()
420
421         tot = sum(x[1] for x in lst)
422         while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS:
423                 _, tmp_size, path = lst.pop()
424                 tot -= tmp_size
425
426                 tmp = path + '.remove'
427                 try:
428                         shutil.rmtree(tmp)
429                 except OSError:
430                         pass
431                 try:
432                         os.rename(path, tmp)
433                 except OSError:
434                         sys.stderr.write('Could not rename %r to %r\n' % (path, tmp))
435                 else:
436                         try:
437                                 shutil.rmtree(tmp)
438                         except OSError:
439                                 sys.stderr.write('Could not remove %r\n' % tmp)
440         sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst)))
441
442
443 def lru_evict():
444         """
445         Reduce the cache size
446         """
447         lockfile = os.path.join(CACHE_DIR, 'all.lock')
448         try:
449                 st = os.stat(lockfile)
450         except EnvironmentError as e:
451                 if e.errno == errno.ENOENT:
452                         with open(lockfile, 'w') as f:
453                                 f.write('')
454                         return
455                 else:
456                         raise
457
458         if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60:
459                 # check every EVICT_INTERVAL_MINUTES minutes if the cache is too big
460                 # OCLOEXEC is unnecessary because no processes are spawned
461                 fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755)
462                 try:
463                         try:
464                                 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
465                         except EnvironmentError:
466                                 if WAFCACHE_VERBOSITY:
467                                         sys.stderr.write('wafcache: another cleaning process is running\n')
468                         else:
469                                 # now dow the actual cleanup
470                                 lru_trim()
471                                 os.utime(lockfile, None)
472                 finally:
473                         os.close(fd)
474
475 class netcache(object):
476         def __init__(self):
477                 self.http = urllib3.PoolManager()
478
479         def url_of(self, sig, i):
480                 return "%s/%s/%s" % (CACHE_DIR, sig, i)
481
482         def upload(self, file_path, sig, i):
483                 url = self.url_of(sig, i)
484                 with open(file_path, 'rb') as f:
485                         file_data = f.read()
486                 r = self.http.request('POST', url, timeout=60,
487                         fields={ 'file': ('%s/%s' % (sig, i), file_data), })
488                 if r.status >= 400:
489                         raise OSError("Invalid status %r %r" % (url, r.status))
490
491         def download(self, file_path, sig, i):
492                 url = self.url_of(sig, i)
493                 with self.http.request('GET', url, preload_content=False, timeout=60) as inf:
494                         if inf.status >= 400:
495                                 raise OSError("Invalid status %r %r" % (url, inf.status))
496                         with open(file_path, 'wb') as out:
497                                 shutil.copyfileobj(inf, out)
498
499         def copy_to_cache(self, sig, files_from, files_to):
500                 try:
501                         for i, x in enumerate(files_from):
502                                 if not os.path.islink(x):
503                                         self.upload(x, sig, i)
504                 except Exception:
505                         return traceback.format_exc()
506                 return OK
507
508         def copy_from_cache(self, sig, files_from, files_to):
509                 try:
510                         for i, x in enumerate(files_to):
511                                 self.download(x, sig, i)
512                 except Exception:
513                         return traceback.format_exc()
514                 return OK
515
516 class fcache(object):
517         def __init__(self):
518                 if not os.path.exists(CACHE_DIR):
519                         try:
520                                 os.makedirs(CACHE_DIR)
521                         except OSError:
522                                 pass
523                 if not os.path.exists(CACHE_DIR):
524                         raise ValueError('Could not initialize the cache directory')
525
526         def copy_to_cache(self, sig, files_from, files_to):
527                 """
528                 Copy files to the cache, existing files are overwritten,
529                 and the copy is atomic only for a given file, not for all files
530                 that belong to a given task object
531                 """
532                 try:
533                         for i, x in enumerate(files_from):
534                                 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
535                                 atomic_copy(x, dest)
536                 except Exception:
537                         return traceback.format_exc()
538                 else:
539                         # attempt trimming if caching was successful:
540                         # we may have things to trim!
541                         try:
542                                 lru_evict()
543                         except Exception:
544                                 return traceback.format_exc()
545                 return OK
546
547         def copy_from_cache(self, sig, files_from, files_to):
548                 """
549                 Copy files from the cache
550                 """
551                 try:
552                         for i, x in enumerate(files_to):
553                                 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
554                                 atomic_copy(orig, x)
555
556                         # success! update the cache time
557                         os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
558                 except Exception:
559                         return traceback.format_exc()
560                 return OK
561
562 class bucket_cache(object):
563         def bucket_copy(self, source, target):
564                 if WAFCACHE_CMD:
565                         def replacer(match):
566                                 if match.group('src'):
567                                         return source
568                                 elif match.group('tgt'):
569                                         return target
570                         cmd = [re_waf_cmd.sub(replacer, x) for x in shlex.split(WAFCACHE_CMD)]
571                 elif CACHE_DIR.startswith('s3://'):
572                         cmd = ['aws', 's3', 'cp', source, target]
573                 elif CACHE_DIR.startswith('gs://'):
574                         cmd = ['gsutil', 'cp', source, target]
575                 else:
576                         cmd = ['mc', 'cp', source, target]
577
578                 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
579                 out, err = proc.communicate()
580                 if proc.returncode:
581                         raise OSError('Error copy %r to %r using: %r (exit %r):\n  out:%s\n  err:%s' % (
582                                 source, target, cmd, proc.returncode, out.decode(errors='replace'), err.decode(errors='replace')))
583
584         def copy_to_cache(self, sig, files_from, files_to):
585                 try:
586                         for i, x in enumerate(files_from):
587                                 dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
588                                 self.bucket_copy(x, dest)
589                 except Exception:
590                         return traceback.format_exc()
591                 return OK
592
593         def copy_from_cache(self, sig, files_from, files_to):
594                 try:
595                         for i, x in enumerate(files_to):
596                                 orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
597                                 self.bucket_copy(orig, x)
598                 except EnvironmentError:
599                         return traceback.format_exc()
600                 return OK
601
602 def loop(service):
603         """
604         This function is run when this file is run as a standalone python script,
605         it assumes a parent process that will communicate the commands to it
606         as pickled-encoded tuples (one line per command)
607
608         The commands are to copy files to the cache or copy files from the
609         cache to a target destination
610         """
611         # one operation is performed at a single time by a single process
612         # therefore stdin never has more than one line
613         txt = sys.stdin.readline().strip()
614         if not txt:
615                 # parent process probably ended
616                 sys.exit(1)
617         ret = OK
618
619         [sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt))
620         if files_from:
621                 # TODO return early when pushing files upstream
622                 ret = service.copy_to_cache(sig, files_from, files_to)
623         elif files_to:
624                 # the build process waits for workers to (possibly) obtain files from the cache
625                 ret = service.copy_from_cache(sig, files_from, files_to)
626         else:
627                 ret = "Invalid command"
628
629         obj = base64.b64encode(cPickle.dumps(ret))
630         sys.stdout.write(obj.decode())
631         sys.stdout.write('\n')
632         sys.stdout.flush()
633
634 if __name__ == '__main__':
635         if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://') or CACHE_DIR.startswith('minio://'):
636                 if CACHE_DIR.startswith('minio://'):
637                         CACHE_DIR = CACHE_DIR[8:]   # minio doesn't need the protocol part, uses config aliases
638                 service = bucket_cache()
639         elif CACHE_DIR.startswith('http'):
640                 service = netcache()
641         else:
642                 service = fcache()
643         while 1:
644                 try:
645                         loop(service)
646                 except KeyboardInterrupt:
647                         break
648