Linux 6.10-rc4
[sfrench/cifs-2.6.git] / tools / testing / kunit / kunit_kernel.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Runs UML kernel, collects output, and handles errors.
4 #
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
8
9 import importlib.abc
10 import importlib.util
11 import logging
12 import subprocess
13 import os
14 import shlex
15 import shutil
16 import signal
17 import threading
18 from typing import Iterator, List, Optional, Tuple
19 from types import FrameType
20
21 import kunit_config
22 import qemu_config
23
24 KCONFIG_PATH = '.config'
25 KUNITCONFIG_PATH = '.kunitconfig'
26 OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27 DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28 ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29 UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30 OUTFILE_PATH = 'test.log'
31 ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32 QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34 class ConfigError(Exception):
35         """Represents an error trying to configure the Linux kernel."""
36
37
38 class BuildError(Exception):
39         """Represents an error trying to build the Linux kernel."""
40
41
42 class LinuxSourceTreeOperations:
43         """An abstraction over command line operations performed on a source tree."""
44
45         def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46                 self._linux_arch = linux_arch
47                 self._cross_compile = cross_compile
48
49         def make_mrproper(self) -> None:
50                 try:
51                         subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52                 except OSError as e:
53                         raise ConfigError('Could not call make command: ' + str(e))
54                 except subprocess.CalledProcessError as e:
55                         raise ConfigError(e.output.decode())
56
57         def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58                 return base_kunitconfig
59
60         def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61                 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62                 if self._cross_compile:
63                         command += ['CROSS_COMPILE=' + self._cross_compile]
64                 if make_options:
65                         command.extend(make_options)
66                 print('Populating config with:\n$', ' '.join(command))
67                 try:
68                         subprocess.check_output(command, stderr=subprocess.STDOUT)
69                 except OSError as e:
70                         raise ConfigError('Could not call make command: ' + str(e))
71                 except subprocess.CalledProcessError as e:
72                         raise ConfigError(e.output.decode())
73
74         def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75                 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
76                 if make_options:
77                         command.extend(make_options)
78                 if self._cross_compile:
79                         command += ['CROSS_COMPILE=' + self._cross_compile]
80                 print('Building with:\n$', ' '.join(command))
81                 try:
82                         proc = subprocess.Popen(command,
83                                                 stderr=subprocess.PIPE,
84                                                 stdout=subprocess.DEVNULL)
85                 except OSError as e:
86                         raise BuildError('Could not call execute make: ' + str(e))
87                 except subprocess.CalledProcessError as e:
88                         raise BuildError(e.output)
89                 _, stderr = proc.communicate()
90                 if proc.returncode != 0:
91                         raise BuildError(stderr.decode())
92                 if stderr:  # likely only due to build warnings
93                         print(stderr.decode())
94
95         def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
96                 raise RuntimeError('not implemented!')
97
98
99 class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
100
101         def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
102                 super().__init__(linux_arch=qemu_arch_params.linux_arch,
103                                  cross_compile=cross_compile)
104                 self._kconfig = qemu_arch_params.kconfig
105                 self._qemu_arch = qemu_arch_params.qemu_arch
106                 self._kernel_path = qemu_arch_params.kernel_path
107                 self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
108                 self._extra_qemu_params = qemu_arch_params.extra_qemu_params
109                 self._serial = qemu_arch_params.serial
110
111         def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
112                 kconfig = kunit_config.parse_from_string(self._kconfig)
113                 kconfig.merge_in_entries(base_kunitconfig)
114                 return kconfig
115
116         def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
117                 kernel_path = os.path.join(build_dir, self._kernel_path)
118                 qemu_command = ['qemu-system-' + self._qemu_arch,
119                                 '-nodefaults',
120                                 '-m', '1024',
121                                 '-kernel', kernel_path,
122                                 '-append', ' '.join(params + [self._kernel_command_line]),
123                                 '-no-reboot',
124                                 '-nographic',
125                                 '-serial', self._serial] + self._extra_qemu_params
126                 # Note: shlex.join() does what we want, but requires python 3.8+.
127                 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
128                 return subprocess.Popen(qemu_command,
129                                         stdin=subprocess.PIPE,
130                                         stdout=subprocess.PIPE,
131                                         stderr=subprocess.STDOUT,
132                                         text=True, errors='backslashreplace')
133
134 class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
135         """An abstraction over command line operations performed on a source tree."""
136
137         def __init__(self, cross_compile: Optional[str]=None):
138                 super().__init__(linux_arch='um', cross_compile=cross_compile)
139
140         def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
141                 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
142                 kconfig.merge_in_entries(base_kunitconfig)
143                 return kconfig
144
145         def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
146                 """Runs the Linux UML binary. Must be named 'linux'."""
147                 linux_bin = os.path.join(build_dir, 'linux')
148                 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
149                 print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
150                 return subprocess.Popen([linux_bin] + params,
151                                            stdin=subprocess.PIPE,
152                                            stdout=subprocess.PIPE,
153                                            stderr=subprocess.STDOUT,
154                                            text=True, errors='backslashreplace')
155
156 def get_kconfig_path(build_dir: str) -> str:
157         return os.path.join(build_dir, KCONFIG_PATH)
158
159 def get_kunitconfig_path(build_dir: str) -> str:
160         return os.path.join(build_dir, KUNITCONFIG_PATH)
161
162 def get_old_kunitconfig_path(build_dir: str) -> str:
163         return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
164
165 def get_parsed_kunitconfig(build_dir: str,
166                            kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
167         if not kunitconfig_paths:
168                 path = get_kunitconfig_path(build_dir)
169                 if not os.path.exists(path):
170                         shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
171                 return kunit_config.parse_file(path)
172
173         merged = kunit_config.Kconfig()
174
175         for path in kunitconfig_paths:
176                 if os.path.isdir(path):
177                         path = os.path.join(path, KUNITCONFIG_PATH)
178                 if not os.path.exists(path):
179                         raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
180
181                 partial = kunit_config.parse_file(path)
182                 diff = merged.conflicting_options(partial)
183                 if diff:
184                         diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff)
185                         raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
186                 merged.merge_in_entries(partial)
187         return merged
188
189 def get_outfile_path(build_dir: str) -> str:
190         return os.path.join(build_dir, OUTFILE_PATH)
191
192 def _default_qemu_config_path(arch: str) -> str:
193         config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
194         if os.path.isfile(config_path):
195                 return config_path
196
197         options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
198         raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
199
200 def _get_qemu_ops(config_path: str,
201                   extra_qemu_args: Optional[List[str]],
202                   cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
203         # The module name/path has very little to do with where the actual file
204         # exists (I learned this through experimentation and could not find it
205         # anywhere in the Python documentation).
206         #
207         # Bascially, we completely ignore the actual file location of the config
208         # we are loading and just tell Python that the module lives in the
209         # QEMU_CONFIGS_DIR for import purposes regardless of where it actually
210         # exists as a file.
211         module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
212         spec = importlib.util.spec_from_file_location(module_path, config_path)
213         assert spec is not None
214         config = importlib.util.module_from_spec(spec)
215         # See https://github.com/python/typeshed/pull/2626 for context.
216         assert isinstance(spec.loader, importlib.abc.Loader)
217         spec.loader.exec_module(config)
218
219         if not hasattr(config, 'QEMU_ARCH'):
220                 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
221         params: qemu_config.QemuArchParams = config.QEMU_ARCH
222         if extra_qemu_args:
223                 params.extra_qemu_params.extend(extra_qemu_args)
224         return params.linux_arch, LinuxSourceTreeOperationsQemu(
225                         params, cross_compile=cross_compile)
226
227 class LinuxSourceTree:
228         """Represents a Linux kernel source tree with KUnit tests."""
229
230         def __init__(
231               self,
232               build_dir: str,
233               kunitconfig_paths: Optional[List[str]]=None,
234               kconfig_add: Optional[List[str]]=None,
235               arch: Optional[str]=None,
236               cross_compile: Optional[str]=None,
237               qemu_config_path: Optional[str]=None,
238               extra_qemu_args: Optional[List[str]]=None) -> None:
239                 signal.signal(signal.SIGINT, self.signal_handler)
240                 if qemu_config_path:
241                         self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
242                 else:
243                         self._arch = 'um' if arch is None else arch
244                         if self._arch == 'um':
245                                 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
246                         else:
247                                 qemu_config_path = _default_qemu_config_path(self._arch)
248                                 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
249
250                 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
251                 if kconfig_add:
252                         kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
253                         self._kconfig.merge_in_entries(kconfig)
254
255         def arch(self) -> str:
256                 return self._arch
257
258         def clean(self) -> bool:
259                 try:
260                         self._ops.make_mrproper()
261                 except ConfigError as e:
262                         logging.error(e)
263                         return False
264                 return True
265
266         def validate_config(self, build_dir: str) -> bool:
267                 kconfig_path = get_kconfig_path(build_dir)
268                 validated_kconfig = kunit_config.parse_file(kconfig_path)
269                 if self._kconfig.is_subset_of(validated_kconfig):
270                         return True
271                 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
272                 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
273                           'This is probably due to unsatisfied dependencies.\n' \
274                           'Missing: ' + ', '.join(str(e) for e in missing)
275                 if self._arch == 'um':
276                         message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
277                                    'on a different architecture with something like "--arch=x86_64".'
278                 logging.error(message)
279                 return False
280
281         def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
282                 kconfig_path = get_kconfig_path(build_dir)
283                 if build_dir and not os.path.exists(build_dir):
284                         os.mkdir(build_dir)
285                 try:
286                         self._kconfig = self._ops.make_arch_config(self._kconfig)
287                         self._kconfig.write_to_file(kconfig_path)
288                         self._ops.make_olddefconfig(build_dir, make_options)
289                 except ConfigError as e:
290                         logging.error(e)
291                         return False
292                 if not self.validate_config(build_dir):
293                         return False
294
295                 old_path = get_old_kunitconfig_path(build_dir)
296                 if os.path.exists(old_path):
297                         os.remove(old_path)  # write_to_file appends to the file
298                 self._kconfig.write_to_file(old_path)
299                 return True
300
301         def _kunitconfig_changed(self, build_dir: str) -> bool:
302                 old_path = get_old_kunitconfig_path(build_dir)
303                 if not os.path.exists(old_path):
304                         return True
305
306                 old_kconfig = kunit_config.parse_file(old_path)
307                 return old_kconfig != self._kconfig
308
309         def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
310                 """Creates a new .config if it is not a subset of the .kunitconfig."""
311                 kconfig_path = get_kconfig_path(build_dir)
312                 if not os.path.exists(kconfig_path):
313                         print('Generating .config ...')
314                         return self.build_config(build_dir, make_options)
315
316                 existing_kconfig = kunit_config.parse_file(kconfig_path)
317                 self._kconfig = self._ops.make_arch_config(self._kconfig)
318
319                 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
320                         return True
321                 print('Regenerating .config ...')
322                 os.remove(kconfig_path)
323                 return self.build_config(build_dir, make_options)
324
325         def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
326                 try:
327                         self._ops.make_olddefconfig(build_dir, make_options)
328                         self._ops.make(jobs, build_dir, make_options)
329                 except (ConfigError, BuildError) as e:
330                         logging.error(e)
331                         return False
332                 return self.validate_config(build_dir)
333
334         def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
335                 if not args:
336                         args = []
337                 if filter_glob:
338                         args.append('kunit.filter_glob=' + filter_glob)
339                 if filter:
340                         args.append('kunit.filter="' + filter + '"')
341                 if filter_action:
342                         args.append('kunit.filter_action=' + filter_action)
343                 args.append('kunit.enable=1')
344
345                 process = self._ops.start(args, build_dir)
346                 assert process.stdout is not None  # tell mypy it's set
347
348                 # Enforce the timeout in a background thread.
349                 def _wait_proc() -> None:
350                         try:
351                                 process.wait(timeout=timeout)
352                         except Exception as e:
353                                 print(e)
354                                 process.terminate()
355                                 process.wait()
356                 waiter = threading.Thread(target=_wait_proc)
357                 waiter.start()
358
359                 output = open(get_outfile_path(build_dir), 'w')
360                 try:
361                         # Tee the output to the file and to our caller in real time.
362                         for line in process.stdout:
363                                 output.write(line)
364                                 yield line
365                 # This runs even if our caller doesn't consume every line.
366                 finally:
367                         # Flush any leftover output to the file
368                         output.write(process.stdout.read())
369                         output.close()
370                         process.stdout.close()
371
372                         waiter.join()
373                         subprocess.call(['stty', 'sane'])
374
375         def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
376                 logging.error('Build interruption occurred. Cleaning console.')
377                 subprocess.call(['stty', 'sane'])