2 # -*- coding: utf-8 -*-
4 # By Gerald Combs <gerald@wireshark.org>
6 # Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
8 # SPDX-License-Identifier: GPL-2.0-or-later
10 '''File format conversion tests'''
17 # XXX Currently unused. It would be nice to be able to use this below.
18 time_output_args = ('-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta')
20 # Microsecond pcap, direct read was used to generate the baseline:
21 # tshark -Tfields -e frame.number -e frame.time_epoch -e frame.time_delta \
22 # -r captures/dhcp.pcap > baseline/ff-ts-usec-pcap-direct.txt
23 baseline_file = 'ff-ts-usec-pcap-direct.txt'
26 @fixtures.fixture(scope='session')
27 def fileformats_baseline_str(dirs):
28 with open(os.path.join(dirs.baseline_dir, baseline_file), 'r') as f:
32 @fixtures.mark_usefixtures('test_env')
33 @fixtures.uses_fixtures
34 class case_fileformat_pcap(subprocesstest.SubprocessTestCase):
35 def test_pcap_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
36 '''Microsecond pcap direct vs microsecond pcap stdin'''
37 capture_proc = self.assertRun(' '.join((cmd_tshark,
40 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
41 '<', capture_file('dhcp.pcap')
44 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
46 def test_pcap_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
47 '''Microsecond pcap direct vs nanosecond pcap stdin'''
48 capture_proc = self.assertRun(' '.join((cmd_tshark,
51 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
52 '<', capture_file('dhcp-nanosecond.pcap')
55 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
57 def test_pcap_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
58 '''Microsecond pcap direct vs nanosecond pcap direct'''
59 capture_proc = self.assertRun((cmd_tshark,
60 '-r', capture_file('dhcp-nanosecond.pcap'),
62 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
65 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
68 @fixtures.mark_usefixtures('test_env')
69 @fixtures.uses_fixtures
70 class case_fileformat_pcapng(subprocesstest.SubprocessTestCase):
71 def test_pcapng_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
72 '''Microsecond pcap direct vs microsecond pcapng stdin'''
73 capture_proc = self.assertRun(' '.join((cmd_tshark,
76 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
77 '<', capture_file('dhcp.pcapng')
80 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
82 def test_pcapng_usec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
83 '''Microsecond pcap direct vs microsecond pcapng direct'''
84 capture_proc = self.assertRun((cmd_tshark,
85 '-r', capture_file('dhcp.pcapng'),
87 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
90 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
92 def test_pcapng_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
93 '''Microsecond pcap direct vs nanosecond pcapng stdin'''
94 capture_proc = self.assertRun(' '.join((cmd_tshark,
97 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
98 '<', capture_file('dhcp-nanosecond.pcapng')
101 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
103 def test_pcapng_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
104 '''Microsecond pcap direct vs nanosecond pcapng direct'''
105 capture_proc = self.assertRun((cmd_tshark,
106 '-r', capture_file('dhcp-nanosecond.pcapng'),
108 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
111 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
114 def check_pcapng_dsb_fields(request, cmd_tshark):
115 '''Factory that checks whether the DSB within the capture file matches.'''
116 self = request.instance
117 def check_dsb_fields_real(outfile, fields):
118 proc = self.assertRun((cmd_tshark,
120 '-Xread_format:MIME Files Format',
122 '-e', 'pcapng.dsb.secrets_type',
123 '-e', 'pcapng.dsb.secrets_length',
124 '-e', 'pcapng.dsb.secrets_data',
125 '-Y', 'pcapng.dsb.secrets_data'
127 # Convert "t1,t2 l1,l2 v1,2" -> [(t1, l1, v1), (t2, l2, v2)]
128 output = proc.stdout_str.strip()
129 actual = list(zip(*[x.split(",") for x in output.split('\t')]))
130 def format_field(field):
132 v_hex = ''.join('%02x' % c for c in v)
133 return ('0x%08x' % t, str(l), v_hex)
134 fields = [format_field(field) for field in fields]
135 self.assertEqual(fields, actual)
136 return check_dsb_fields_real
139 @fixtures.mark_usefixtures('base_env')
140 @fixtures.uses_fixtures
141 class case_fileformat_pcapng_dsb(subprocesstest.SubprocessTestCase):
142 def test_pcapng_dsb_1(self, cmd_tshark, dirs, capture_file, check_pcapng_dsb_fields):
143 '''Check that DSBs are preserved while rewriting files.'''
144 dsb_keys1 = os.path.join(dirs.key_dir, 'tls12-dsb-1.keys')
145 dsb_keys2 = os.path.join(dirs.key_dir, 'tls12-dsb-2.keys')
146 outfile = self.filename_from_id('tls12-dsb-same.pcapng')
147 self.assertRun((cmd_tshark,
148 '-r', capture_file('tls12-dsb.pcapng'),
151 with open(dsb_keys1, 'r') as f:
152 dsb1_contents = f.read().encode('utf8')
153 with open(dsb_keys2, 'r') as f:
154 dsb2_contents = f.read().encode('utf8')
155 check_pcapng_dsb_fields(outfile, (
156 (0x544c534b, len(dsb1_contents), dsb1_contents),
157 (0x544c534b, len(dsb2_contents), dsb2_contents),
160 def test_pcapng_dsb_2(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields):
161 '''Insert a single DSB into a pcapng file.'''
162 key_file = os.path.join(dirs.key_dir, 'dhe1_keylog.dat')
163 outfile = self.filename_from_id('dhe1-dsb.pcapng')
164 self.assertRun((cmd_editcap,
165 '--inject-secrets', 'tls,%s' % key_file,
166 capture_file('dhe1.pcapng.gz'), outfile
168 with open(key_file, 'rb') as f:
169 keylog_contents = f.read()
170 check_pcapng_dsb_fields(outfile, (
171 (0x544c534b, len(keylog_contents), keylog_contents),
174 def test_pcapng_dsb_3(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields):
175 '''Insert two DSBs into a pcapng file.'''
176 key_file1 = os.path.join(dirs.key_dir, 'dhe1_keylog.dat')
177 key_file2 = os.path.join(dirs.key_dir, 'http2-data-reassembly.keys')
178 outfile = self.filename_from_id('dhe1-dsb.pcapng')
179 self.assertRun((cmd_editcap,
180 '--inject-secrets', 'tls,%s' % key_file1,
181 '--inject-secrets', 'tls,%s' % key_file2,
182 capture_file('dhe1.pcapng.gz'), outfile
184 with open(key_file1, 'rb') as f:
185 keylog1_contents = f.read()
186 with open(key_file2, 'rb') as f:
187 keylog2_contents = f.read()
188 check_pcapng_dsb_fields(outfile, (
189 (0x544c534b, len(keylog1_contents), keylog1_contents),
190 (0x544c534b, len(keylog2_contents), keylog2_contents),
193 def test_pcapng_dsb_4(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields):
194 '''Insert a single DSB into a pcapng file with existing DSBs.'''
195 dsb_keys1 = os.path.join(dirs.key_dir, 'tls12-dsb-1.keys')
196 dsb_keys2 = os.path.join(dirs.key_dir, 'tls12-dsb-2.keys')
197 key_file = os.path.join(dirs.key_dir, 'dhe1_keylog.dat')
198 outfile = self.filename_from_id('tls12-dsb-extra.pcapng')
199 self.assertRun((cmd_editcap,
200 '--inject-secrets', 'tls,%s' % key_file,
201 capture_file('tls12-dsb.pcapng'), outfile
203 with open(dsb_keys1, 'r') as f:
204 dsb1_contents = f.read().encode('utf8')
205 with open(dsb_keys2, 'r') as f:
206 dsb2_contents = f.read().encode('utf8')
207 with open(key_file, 'rb') as f:
208 keylog_contents = f.read()
209 # New DSBs are inserted before the first record. Due to the current
210 # implementation, this is inserted before other (existing) DSBs. This
211 # might change in the future if it is deemed more logical.
212 check_pcapng_dsb_fields(outfile, (
213 (0x544c534b, len(keylog_contents), keylog_contents),
214 (0x544c534b, len(dsb1_contents), dsb1_contents),
215 (0x544c534b, len(dsb2_contents), dsb2_contents),
219 @fixtures.mark_usefixtures('test_env')
220 @fixtures.uses_fixtures
221 class case_fileformat_mime(subprocesstest.SubprocessTestCase):
222 def test_mime_pcapng_gz(self, cmd_tshark, capture_file):
223 '''Test that the full uncompressed contents is shown.'''
224 proc = self.assertRun((cmd_tshark,
225 '-r', capture_file('icmp.pcapng.gz'),
226 '-Xread_format:MIME Files Format',
227 '-Tfields', '-e', 'frame.len', '-e', 'pcapng.block.length',
229 self.assertEqual(proc.stdout_str.strip(), '480\t128,128,88,88,132,132,132,132')