python:samba:tests: Fix code spelling
[samba.git] / python / samba / tests / logfiles.py
1 # Unix SMB/CIFS implementation.
2 #
3 # Copyright (C) Catalyst.Net Ltd. 2022
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
19 import subprocess
20 import os
21 from samba.tests import TestCaseInTempDir
22 from pprint import pprint
23
24 HERE = os.path.dirname(__file__)
25 S4_SERVER = os.path.join(HERE, '../../../../bin/test_s4_logging')
26 S3_SERVER = os.path.join(HERE, '../../../../bin/test_s3_logging')
27
28 CLASS_LIST = ["all", "tdb", "printdrivers", "lanman", "smb",
29               "rpc_parse", "rpc_srv", "rpc_cli", "passdb", "sam", "auth",
30               "winbind", "vfs", "idmap", "quota", "acls", "locking", "msdfs",
31               "dmapi", "registry", "scavenger", "dns", "ldb", "tevent",
32               "auth_audit", "auth_json_audit", "kerberos", "drs_repl",
33               "smb2", "smb2_credits", "dsdb_audit", "dsdb_json_audit",
34               "dsdb_password_audit", "dsdb_password_json_audit",
35               "dsdb_transaction_audit", "dsdb_transaction_json_audit",
36               "dsdb_group_audit", "dsdb_group_json_audit"]
37
38
39 CLASS_CODES = {k: i for i, k in enumerate(CLASS_LIST)}
40
41
42 class S4LoggingTests(TestCaseInTempDir):
43     server = S4_SERVER
44     def _write_smb_conf(self,
45                         default_level=2,
46                         default_file="default",
47                         mapping=()):
48         self.smbconf = os.path.join(self.tempdir, "smb.conf")
49
50         with open(self.smbconf, "w") as f:
51             f.write('[global]\n')
52             if default_file is not None:
53                 dest = os.path.join(self.tempdir,
54                                     default_file)
55                 f.write(f"    log file = {dest}\n")
56
57             f.write("    log level = ")
58             if default_level:
59                 f.write(f"{default_level}")
60
61             for dbg_class, log_level, log_file in mapping:
62                 f.write(' ')
63                 f.write(dbg_class)
64                 if log_level is not None:
65                     f.write(f':{log_level}')
66                 if log_file is not None:
67                     dest = os.path.join(self.tempdir,
68                                         log_file)
69
70                     f.write(f'@{dest}')
71             f.write('\n')
72         self.addCleanup(os.unlink, self.smbconf)
73
74     def _extract_log_level_line(self, new_level=2):
75         # extricate the 'log level' line from the smb.conf, returning
76         # the value, and replacing the log level line with something
77         # innocuous.
78         smbconf2 = self.smbconf + 'new'
79         with open(self.smbconf) as f:
80             with open(smbconf2, 'w') as f2:
81                 for line in f:
82                     if 'log level' in line:
83                         debug_arg = line.split('=', 1)[1].strip()
84                         if new_level is not None:
85                             f2.write(f'    log level = {new_level}\n')
86                     else:
87                         f2.write(line)
88         os.replace(smbconf2, self.smbconf)
89         return debug_arg
90
91     def _get_expected_strings(self, mapping,
92                               level_filter,
93                               default_file='default',
94                               file_filter=None):
95         default = os.path.join(self.tempdir, default_file)
96         expected = {default: []}
97         # this kind of thing:
98         # "  logging for 'dns' [21], at level 4"
99         for dbg_class, log_level, log_file in mapping:
100             if log_file is None:
101                 log_file = default_file
102
103             f = os.path.join(self.tempdir, log_file)
104             expected.setdefault(f, [])
105             if log_level < level_filter:
106                 continue
107             if file_filter not in (None, log_file):
108                 continue
109             s = (f"  logging for '{dbg_class}' [{CLASS_CODES[dbg_class]}], "
110                  f"at level {level_filter}")
111             expected[f].append(s)
112
113         return expected
114
115     def _run_s4_logger(self, log_level, *extra_args):
116         cmd = [self.server,
117                '-s', self.smbconf,
118                '-L', str(log_level),
119                *extra_args]
120
121         p = subprocess.run(cmd,
122                            stdout=subprocess.PIPE,
123                            stderr=subprocess.PIPE)
124         self.assertEqual(p.returncode, 0,
125                          f"'{' '.join(cmd)}' failed ({p.returncode})")
126
127         return p.stdout.decode(), p.stderr.decode()
128
129     def assert_string_contains(self, string, expected_lines,
130                                filename=None):
131         expected_lines = set(expected_lines)
132         string_lines = set(string.split('\n'))
133         present_lines = string_lines & expected_lines
134         if present_lines != expected_lines:
135             if filename:
136                 print(filename)
137             print("expected %d lines, found %d" %
138                   (len(expected_lines), len(present_lines)))
139             print("missing lines:")
140             pprint(expected_lines - present_lines)
141             raise AssertionError("missing lines")
142
143     def assert_file_contains(self, filename, expected_lines):
144         with open(filename) as f:
145             string = f.read()
146         self.assert_string_contains(string, expected_lines, filename)
147
148     def assert_n_known_lines_string(self, string, n):
149         count = string.count("logging for '")
150         if count != n:
151             raise AssertionError(
152                 f"string has {count} lines, expected {n}")
153
154     def assert_n_known_lines(self, filename, n):
155         with open(filename) as f:
156             string = f.read()
157         count = string.count("  logging for '")
158         if count != n:
159             raise AssertionError(
160                 f"{filename} has {count} lines, expected {n}")
161
162     def assert_unlink_expected_strings(self, expected_strings):
163         for k, v in expected_strings.items():
164             if not os.path.exists(k):
165                 self.fail(f"{k} does not exist")
166             self.assert_file_contains(k, v)
167             self.assert_n_known_lines(k, len(v))
168             os.unlink(k)
169
170     def test_each_to_its_own(self):
171         level = 4
172         mapping = [(x, level, x) for x in CLASS_LIST]
173         expected_strings = self._get_expected_strings(mapping, level)
174
175         self._write_smb_conf(mapping=mapping)
176         stdout, stderr = self._run_s4_logger(level)
177         self.assert_unlink_expected_strings(expected_strings)
178
179     def test_all_to_one(self):
180         level = 4
181         dest = 'everything'
182         mapping = [(x, level, dest) for x in CLASS_LIST]
183         expected_strings = self._get_expected_strings(mapping, level)
184
185         self._write_smb_conf(mapping=mapping)
186         stdout, stderr = self._run_s4_logger(level)
187         self.assert_unlink_expected_strings(expected_strings)
188
189     def test_bifurcate(self):
190         level = 4
191         dests = ['even', 'odd']
192         mapping = [(x, level + 1, dests[i & 1])
193                    for i, x in enumerate(CLASS_LIST)]
194         expected_strings = self._get_expected_strings(mapping, level)
195
196         self._write_smb_conf(mapping=mapping)
197         stdout, stderr = self._run_s4_logger(level)
198         self.assert_unlink_expected_strings(expected_strings)
199
200     def test_bifurcate_level_out_of_range(self):
201         # nothing will be logged, because we're logging at a too high
202         # level.
203         level = 4
204         dests = ['even', 'odd']
205         mapping = [(x, level - 1, dests[i & 1])
206                    for i, x in enumerate(CLASS_LIST)]
207         expected_strings = self._get_expected_strings(mapping, level)
208
209         self._write_smb_conf(mapping=mapping)
210         stdout, stderr = self._run_s4_logger(level)
211         self.assert_unlink_expected_strings(expected_strings)
212
213     def test_bifurcate_misc_log_level(self):
214         # We are sending even numbers to default and odd numbers to
215         # 'odd', at various levels, depending on mod 3. Like this:
216         #
217         # log level = 2 all:5 \
218         #               tdb:4@odd \
219         #               printdrivers:3 \
220         #               lanman:5@odd \
221         #               smb:4 \
222         #               rpc_parse:3@odd \
223         #               rpc_srv:5 ...
224         #
225         # Therefore, 'default' should get classes that are (0 or 4) % 6
226         # and 'odd' should get classes that are (1 or 3) % 6.
227
228         level = 4
229         dests = [None, 'odd']
230         mapping = []
231         for i, x in enumerate(CLASS_LIST):
232             parity = i & 1
233             log_level = level + 1 - (i % 3)
234             mapping.append((x, log_level, dests[parity]))
235
236         expected_strings = self._get_expected_strings(mapping, level)
237
238         self._write_smb_conf(mapping=mapping)
239         stdout, stderr = self._run_s4_logger(level)
240         self.assert_unlink_expected_strings(expected_strings)
241
242     def test_all_different_ways_cmdline_d(self):
243         level = 4
244         dests = [None, 'a', 'b', 'c']
245         mapping = []
246         seed = 123
247         for i, x in enumerate(CLASS_LIST):
248             d = seed & 3
249             seed = seed * 17 + 1
250             log_level = seed % 10
251             seed &= 0xff
252             mapping.append((x, log_level, dests[d]))
253
254         expected_strings = self._get_expected_strings(mapping, level)
255
256         self._write_smb_conf(mapping=mapping)
257         debug_arg = self._extract_log_level_line(26)
258
259         stdout, stderr = self._run_s4_logger(level, '-d', debug_arg)
260         self.assert_unlink_expected_strings(expected_strings)
261
262     def test_all_different_ways_cmdline_d_interactive(self):
263         level = 4
264         dests = [None, 'a', 'b', 'c']
265         mapping = []
266         seed = 1234
267         for i, x in enumerate(CLASS_LIST):
268             d = seed & 3
269             seed = seed * 13 + 1
270             log_level = seed % 10
271             seed &= 0xff
272             mapping.append((x, log_level, dests[d]))
273
274         expected_strings = self._get_expected_strings(mapping, level)
275
276         self._write_smb_conf(mapping=mapping)
277         debug_arg = self._extract_log_level_line(None)
278         stdout, stderr = self._run_s4_logger(level, '-d', debug_arg, '-i')
279         expected_lines = []
280         for v in expected_strings.values():
281             # stderr doesn't end up with leading '  '
282             expected_lines.extend([x.strip() for x in v])
283
284         self.assert_string_contains(stderr, expected_lines)
285         self.assert_n_known_lines_string(stderr, len(expected_lines))
286
287     def test_only_some_level_0(self):
288         # running the logger with -L 0 makes the log messages run at
289         # level 0 (i.e DBG_ERR), so we always see them in default,
290         # even though smb.conf doesn't ask.
291         mapping = [(x, 3, ['default', 'bees']['b' in x])
292                    for x in CLASS_LIST]
293         expected_strings = self._get_expected_strings(mapping, 0)
294         self._write_smb_conf(mapping=[x for x in mapping if x[2] == 'bees'])
295         stdout, stderr = self._run_s4_logger(0)
296         self.assert_unlink_expected_strings(expected_strings)
297
298     def test_only_some_level_3(self):
299         # here, we're expecting the unmentioned non-b classes to just
300         # disappear.
301         level = 3
302         mapping = [(x, level, 'bees') for x in CLASS_LIST if 'b' in x]
303         expected_strings = self._get_expected_strings(mapping, level)
304         self._write_smb_conf(mapping=[x for x in mapping if x[2] == 'bees'])
305         stdout, stderr = self._run_s4_logger(level)
306         self.assert_unlink_expected_strings(expected_strings)
307
308     def test_none(self):
309         level = 4
310         mapping = []
311         expected_strings = self._get_expected_strings(mapping, level)
312         self._write_smb_conf(mapping=mapping)
313         stdout, stderr = self._run_s4_logger(level)
314         self.assert_unlink_expected_strings(expected_strings)
315
316     def test_none_high_default(self):
317         # We set the default level to 5 and do nothing else special,
318         # which means we need a different mapping for the smb.conf
319         # than the expected strings.
320         level = 4
321         mapping = [(x, 5, 'default') for x in CLASS_LIST]
322         expected_strings = self._get_expected_strings(mapping, level)
323         # note the empty mapping in smb.conf
324         self._write_smb_conf(mapping=[], default_level=5)
325         stdout, stderr = self._run_s4_logger(level)
326         self.assert_unlink_expected_strings(expected_strings)
327
328     def test_none_high_cmdline_d(self):
329         # We set the default level to 2, but run the 'server' with -d 10.
330         level = 4
331         mapping = [(x, 10, 'default') for x in CLASS_LIST]
332         expected_strings = self._get_expected_strings(mapping, level)
333         # note the empty mapping in smb.conf
334         self._write_smb_conf(mapping=[])
335         stdout, stderr = self._run_s4_logger(level, '-d', '10')
336         self.assert_unlink_expected_strings(expected_strings)
337
338     def test_interactive_high_default_simple(self):
339         # running with -i should send everything to stderr.
340         level = 4
341         mapping = [(x, 5, 'default') for x in CLASS_LIST]
342         expected_strings = self._get_expected_strings(mapping, level)
343         self._write_smb_conf(mapping=[], default_level=5)
344         stdout, stderr = self._run_s4_logger(level, '-i')
345         expected_lines = []
346         for v in expected_strings.values():
347             # stderr doesn't end up with leading '  '
348             expected_lines.extend([x.strip() for x in v])
349
350         self.assert_string_contains(stderr, expected_lines)
351
352     def test_interactive_complex_smb_conf(self):
353         # running with -i should send everything to stderr. The
354         # smb.conf will set the levels, but the target files are
355         # overridden.
356         # (this is the test_bifurcate_misc_log_level() smb.conf).
357         level = 4
358         dests = [None, 'odd']
359         mapping = []
360         for i, x in enumerate(CLASS_LIST):
361             parity = i & 1
362             log_level = level + 1 - (i % 3)
363             mapping.append((x, log_level, dests[parity]))
364
365         expected_strings = self._get_expected_strings(mapping, level)
366
367         self._write_smb_conf(mapping=mapping)
368         stdout, stderr = self._run_s4_logger(level, '-i')
369         expected_lines = []
370         for v in expected_strings.values():
371             # stderr doesn't end up with leading '  '
372             expected_lines.extend([x.strip() for x in v])
373
374         self.assert_string_contains(stderr, expected_lines)
375
376
377 class S3LoggingTests(S4LoggingTests):
378     server = S3_SERVER
379     # These tests were developed for testing the test_logger when
380     # linked against CMDLINE_S4 (see lib/util/wscript_build), but can
381     # also run when linked against CMDLINE_S3.