Test: Add fileformats and I/O.
authorGerald Combs <gerald@wireshark.org>
Fri, 27 Apr 2018 17:35:17 +0000 (10:35 -0700)
committerGerald Combs <gerald@wireshark.org>
Fri, 27 Apr 2018 19:52:04 +0000 (19:52 +0000)
Add the fileformats and I/O suites. Move some more common code to
subprocesstest.py and add a diffOutput method.

Change-Id: I2ec34e46539022bdce78520645fdca6dfc1a8c1a
Reviewed-on: https://code.wireshark.org/review/27183
Petri-Dish: Gerald Combs <gerald@wireshark.org>
Tested-by: Petri Dish Buildbot
Reviewed-by: Gerald Combs <gerald@wireshark.org>
13 files changed:
docbook/wsdg_src/WSDG_chapter_tests.asciidoc
test/baseline/ff-ts-usec-pcap-direct.txt [new file with mode: 0644]
test/config.py
test/subprocesstest.py
test/suite_capture.py
test/suite_clopts.py
test/suite_decryption.py
test/suite_dissection.py
test/suite_fileformats.py [new file with mode: 0644]
test/suite_io.py [new file with mode: 0644]
test/test.py
test/util_dump_dhcp_pcap.py [new file with mode: 0755]
test/util_slow_dhcp_pcap.py [deleted file]

index 249e5a416d853a127f62343058087d60f8cafc54..092ebe88ccbb07ebb2faa9af62852d8994b31461 100644 (file)
@@ -22,7 +22,10 @@ To run all tests from CMake do the following:
 * Pass `-DTEST_EXTRA_ARGS=--disable-capture` or
   `-DTEST_EXTRA_ARGS=--capture-interface=<interface>`
   as needed for your system.
-* Build the “test” target or run ctest, e.g. `ctest --jobs=4 --verbose`.
+* Build the “test” target or run ctest, e.g. `ctest --force-new-ctest-process -j 4 --verbose`.
+
+On Windows, “ctest” requires a build configuration parameter, e.g.
+`ctest -C RelWithDebInfo --force-new-ctest-process -j 4 --verbose`.
 
 To run all tests directly, run `test.py -p
 /path/to/wireshark-build/run-directory <capture args>`.
@@ -168,3 +171,7 @@ class case_decrypt_80211(subprocesstest.SubprocessTestCase):
         self.assertTrue(self.grepOutput('favicon.ico'))
 ----
 
+Tests can be run in parallel. This means that any files you create must
+be unique for each test. “subprocesstest.filename_from_id” can be used
+to generate a filename based on the current test name. It also ensures
+that the file will be automatically removed after the test has run.
diff --git a/test/baseline/ff-ts-usec-pcap-direct.txt b/test/baseline/ff-ts-usec-pcap-direct.txt
new file mode 100644 (file)
index 0000000..f4ac417
--- /dev/null
@@ -0,0 +1,4 @@
+1      1102274184.317453000    0.000000000
+2      1102274184.317748000    0.000295000
+3      1102274184.387484000    0.069736000
+4      1102274184.387798000    0.000314000
index e767a4aa10fee6182daa847278a81d2fc00574b5..5b34d13f40846656189e6bfcb55a865acc33a67c 100644 (file)
@@ -1,4 +1,5 @@
 #
+# -*- coding: utf-8 -*-
 # Wireshark tests
 # By Gerald Combs <gerald@wireshark.org>
 #
@@ -16,10 +17,11 @@ import sys
 import tempfile
 
 commands = (
+    'capinfos',
     'dumpcap',
+    'rawshark',
     'tshark',
     'wireshark',
-    'capinfos',
 )
 
 can_capture = False
@@ -27,10 +29,11 @@ capture_interface = None
 
 # Our executables
 # Strings
-cmd_tshark = None
+cmd_capinfos = None
 cmd_dumpcap = None
+cmd_rawshark = None
+cmd_tshark = None
 cmd_wireshark = None
-cmd_capinfos = None
 # Arrays
 args_ping = None
 
index 18313db795d371f2d49285b9ff43db2b3b73574c..357104b4616fa44498f66c54a72d1aa7da503e25 100644 (file)
@@ -1,4 +1,5 @@
 #
+# -*- coding: utf-8 -*-
 # Wireshark tests
 # By Gerald Combs <gerald@wireshark.org>
 #
@@ -8,6 +9,8 @@
 #
 '''Subprocess test case superclass'''
 
+import config
+import difflib
 import io
 import os
 import os.path
@@ -25,6 +28,32 @@ import unittest
 if sys.version_info[0] >= 3:
     process_timeout = 300 # Seconds
 
+def capture_command(cmd, *args, **kwargs):
+    '''Convert the supplied arguments into a command suitable for SubprocessTestCase.
+
+    If shell is true, return a string. Otherwise, return a list of arguments.'''
+    shell = kwargs.pop('shell', False)
+    if shell:
+        cap_cmd = ['"' + cmd + '"']
+    else:
+        cap_cmd = [cmd]
+    if cmd == config.cmd_wireshark:
+        cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
+    cap_cmd += args
+    if shell:
+        return ' '.join(cap_cmd)
+    else:
+        return cap_cmd
+
+def cat_dhcp_command(mode):
+    '''Create a command string for dumping dhcp.pcap to stdout'''
+    # XXX Do this in Python in a thread?
+    sd_cmd = ''
+    if sys.executable:
+        sd_cmd = sys.executable + ' '
+    sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
+    return sd_cmd
+
 class LoggingPopen(subprocess.Popen):
     '''Run a process using subprocess.Popen. Capture and log its output.
 
@@ -104,7 +133,10 @@ class SubprocessTestCase(unittest.TestCase):
 
     def filename_from_id(self, filename):
         '''Generate a filename prefixed with our test ID.'''
-        return self.id() + '.' + filename
+        id_filename = self.id() + '.' + filename
+        if id_filename not in self.cleanup_files:
+            self.cleanup_files.append(id_filename)
+        return id_filename
 
     def kill_processes(self):
         '''Kill any processes we've opened so far'''
@@ -158,6 +190,18 @@ class SubprocessTestCase(unittest.TestCase):
                 pass
         self.cleanup_files = []
 
+    def checkPacketCount(self, num_packets, cap_file=None):
+        got_num_packets = False
+        if not cap_file:
+            cap_file = self.filename_from_id('testout.pcap')
+        self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
+        capinfos_testout = str(subprocess.check_output((config.cmd_capinfos, cap_file)))
+        self.log_fd_write_bytes(capinfos_testout)
+        count_pat = 'Number of packets:\s+{}'.format(num_packets)
+        if re.search(count_pat, capinfos_testout):
+            got_num_packets = True
+        self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
+
     def countOutput(self, search_pat, proc=None):
         '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
         match_count = 0
@@ -174,6 +218,23 @@ class SubprocessTestCase(unittest.TestCase):
     def grepOutput(self, search_pat, proc=None):
         return self.countOutput(search_pat, proc) > 0
 
+    def diffOutput(self, blob_a, blob_b, *args, **kwargs):
+        '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
+
+        blob_a and blob_b must be UTF-8 strings.'''
+        lines_a = blob_a.splitlines()
+        lines_b = blob_b.splitlines()
+        diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
+        if len(diff) > 0:
+            if sys.version_info[0] < 3 and not isinstance(diff, unicode):
+                diff = unicode(diff, 'UTF-8', 'replace')
+            self.log_fd.flush()
+            self.log_fd.write(u'-- Begin diff output --\n')
+            self.log_fd.writelines(diff)
+            self.log_fd.write(u'-- End diff output --\n')
+            return False
+        return True
+
     def startProcess(self, proc_args, env=None, shell=False):
         '''Start a process in the background. Returns a subprocess.Popen object. You typically wait for it using waitProcess() or assertWaitProcess().'''
         proc = LoggingPopen(proc_args, env=env, shell=shell, log_fd=self.log_fd)
index ff6804f5df80c74ec77cd58e81a4b504597a4aaa..d76e2e24e0f33ab6cf7e378281efd20f3457ffcc 100644 (file)
@@ -1,4 +1,5 @@
 #
+# -*- coding: utf-8 -*-
 # Wireshark tests
 # By Gerald Combs <gerald@wireshark.org>
 #
@@ -24,28 +25,6 @@ snapshot_len = 96
 capture_env = os.environ.copy()
 capture_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True'
 
-def capture_command(cmd, *args, **kwargs):
-    shell = kwargs.pop('shell', False)
-    if shell:
-        cap_cmd = ['"' + cmd + '"']
-    else:
-        cap_cmd = [cmd]
-    if cmd == config.cmd_wireshark:
-        cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
-    cap_cmd += args
-    if shell:
-        return ' '.join(cap_cmd)
-    else:
-        return cap_cmd
-
-def slow_dhcp_command():
-    # XXX Do this in Python in a thread?
-    sd_cmd = ''
-    if sys.executable:
-        sd_cmd = '"{}" '.format(sys.executable)
-    sd_cmd += os.path.join(config.this_dir, 'util_slow_dhcp_pcap.py')
-    return sd_cmd
-
 def start_pinging(self):
     ping_procs = []
     if sys.platform.startswith('win32'):
@@ -61,19 +40,8 @@ def stop_pinging(ping_procs):
     for proc in ping_procs:
         proc.kill()
 
-def check_testout_num_packets(self, num_packets, cap_file=None):
-    got_num_packets = False
-    if not cap_file:
-        cap_file = self.filename_from_id(testout_pcap)
-    self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
-    capinfos_testout = str(subprocess.check_output((config.cmd_capinfos, cap_file)))
-    self.log_fd_write_bytes(capinfos_testout)
-    count_pat = 'Number of packets:\s+{}'.format(num_packets)
-    if re.search(count_pat, capinfos_testout):
-        got_num_packets = True
-    self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
-
 def check_capture_10_packets(self, cmd=None, to_stdout=False):
+    # Similar to suite_io.check_io_4_packets.
     if not config.canCapture():
         self.skipTest('Test requires capture privileges and an interface.')
     if cmd == config.cmd_wireshark and not config.canDisplay():
@@ -84,7 +52,7 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False):
     testout_file = self.filename_from_id(testout_pcap)
     ping_procs = start_pinging(self)
     if to_stdout:
-        capture_proc = self.runProcess(capture_command(cmd,
+        capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
             '-i', '"{}"'.format(config.capture_interface),
             '-p',
             '-w', '-',
@@ -98,7 +66,7 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False):
         shell=True
         )
     else:
-        capture_proc = self.runProcess(capture_command(cmd,
+        capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
             '-i', config.capture_interface,
             '-p',
             '-w', testout_file,
@@ -110,13 +78,12 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False):
         )
     capture_returncode = capture_proc.returncode
     stop_pinging(ping_procs)
-    self.cleanup_files.append(testout_file)
     if capture_returncode != 0:
         self.log_fd.write('{} -D output:\n'.format(cmd))
         self.runProcess((cmd, '-D'))
     self.assertEqual(capture_returncode, 0)
     if (capture_returncode == 0):
-        check_testout_num_packets(self, 10)
+        self.checkPacketCount(10)
 
 def check_capture_fifo(self, cmd=None):
     if not config.canMkfifo():
@@ -127,18 +94,17 @@ def check_capture_fifo(self, cmd=None):
     capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
     testout_file = self.filename_from_id(testout_pcap)
     fifo_file = self.filename_from_id('testout.fifo')
-    self.cleanup_files.append(fifo_file)
     try:
         # If a previous test left its fifo laying around, e.g. from a failure, remove it.
         os.unlink(fifo_file)
     except:
         pass
     os.mkfifo(fifo_file)
-    slow_dhcp_cmd = slow_dhcp_command()
+    slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
     fifo_proc = self.startProcess(
         ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
         shell=True)
-    capture_proc = self.runProcess(capture_command(cmd,
+    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
         '-i', fifo_file,
         '-p',
         '-w', testout_file,
@@ -146,22 +112,22 @@ def check_capture_fifo(self, cmd=None):
     ),
     env=capture_env
     )
-    self.cleanup_files.append(testout_file)
     fifo_proc.kill()
     self.assertTrue(os.path.isfile(testout_file))
     capture_returncode = capture_proc.returncode
     self.assertEqual(capture_returncode, 0)
     if (capture_returncode == 0):
-        check_testout_num_packets(self, 8)
+        self.checkPacketCount(8)
 
 def check_capture_stdin(self, cmd=None):
+    # Similar to suite_io.check_io_4_packets.
     if cmd == config.cmd_wireshark and not config.canDisplay():
         self.skipTest('Test requires a display.')
     self.assertIsNotNone(cmd)
     capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
     testout_file = self.filename_from_id(testout_pcap)
-    slow_dhcp_cmd = slow_dhcp_command()
-    capture_cmd = capture_command(cmd,
+    slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
+    capture_cmd = subprocesstest.capture_command(cmd,
         '-i', '-',
         '-w', testout_file,
         '-a', 'duration:{}'.format(capture_duration),
@@ -170,7 +136,6 @@ def check_capture_stdin(self, cmd=None):
     if cmd == config.cmd_wireshark:
         capture_cmd += ' -o console.log.level:127'
     pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, env=capture_env, shell=True)
-    self.cleanup_files.append(testout_file)
     pipe_returncode = pipe_proc.returncode
     self.assertEqual(pipe_returncode, 0)
     if cmd == config.cmd_wireshark:
@@ -179,7 +144,7 @@ def check_capture_stdin(self, cmd=None):
         self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
     self.assertTrue(os.path.isfile(testout_file))
     if (pipe_returncode == 0):
-        check_testout_num_packets(self, 8)
+        self.checkPacketCount(8)
 
 def check_capture_2multi_10packets(self, cmd=None):
     # This was present in the Bash version but was incorrect and not part of any suite.
@@ -196,7 +161,7 @@ def check_capture_read_filter(self, cmd=None):
     self.assertIsNotNone(cmd)
     ping_procs = start_pinging(self)
     testout_file = self.filename_from_id(testout_pcap)
-    capture_proc = self.runProcess(capture_command(cmd,
+    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
         '-i', config.capture_interface,
         '-p',
         '-w', testout_file,
@@ -210,11 +175,10 @@ def check_capture_read_filter(self, cmd=None):
     )
     capture_returncode = capture_proc.returncode
     stop_pinging(ping_procs)
-    self.cleanup_files.append(testout_file)
     self.assertEqual(capture_returncode, 0)
 
     if (capture_returncode == 0):
-        check_testout_num_packets(self, 0)
+        self.checkPacketCount(0)
 
 def check_capture_snapshot_len(self, cmd=None):
     if not config.canCapture():
@@ -226,7 +190,7 @@ def check_capture_snapshot_len(self, cmd=None):
     self.assertIsNotNone(cmd)
     ping_procs = start_pinging(self)
     testout_file = self.filename_from_id(testout_pcap)
-    capture_proc = self.runProcess(capture_command(cmd,
+    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
         '-i', config.capture_interface,
         '-p',
         '-w', testout_file,
@@ -238,7 +202,6 @@ def check_capture_snapshot_len(self, cmd=None):
     )
     capture_returncode = capture_proc.returncode
     stop_pinging(ping_procs)
-    self.cleanup_files.append(testout_file)
     self.assertEqual(capture_returncode, 0)
     self.assertTrue(os.path.isfile(testout_file))
 
@@ -251,10 +214,9 @@ def check_capture_snapshot_len(self, cmd=None):
         '-Y', 'frame.cap_len>{}'.format(snapshot_len),
     ))
     filter_returncode = filter_proc.returncode
-    self.cleanup_files.append(testout2_file)
     self.assertEqual(capture_returncode, 0)
     if (capture_returncode == 0):
-        check_testout_num_packets(self, 0, cap_file=testout2_file)
+        self.checkPacketCount(0, cap_file=testout2_file)
 
 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
     def test_wireshark_capture_10_packets_to_file(self):
index 3516d5fa36cb19f83bf86cb699f44a8e699585f9..a5312c4ad5b24ab2b96ab0d91da0c8f5422b0324 100644 (file)
@@ -1,4 +1,5 @@
 #
+# -*- coding: utf-8 -*-
 # Wireshark tests
 # By Gerald Combs <gerald@wireshark.org>
 #
@@ -17,6 +18,7 @@ import unittest
 #glossaries = ('fields', 'protocols', 'values', 'decodes', 'defaultprefs', 'currentprefs')
 
 glossaries = ('decodes', 'values')
+testout_pcap = 'testout.pcap'
 
 class case_dumpcap_invalid_chars(subprocesstest.SubprocessTestCase):
     # XXX Should we generate individual test functions instead of looping?
@@ -51,7 +53,8 @@ class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
             self.skipTest('Test requires capture privileges and an interface.')
         invalid_filter = '__invalid_protocol'
         # $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
-        self.runProcess((config.cmd_dumpcap, '-f', invalid_filter, '-w', 'testout.pcap' ))
+        testout_file = self.filename_from_id(testout_pcap)
+        self.runProcess((config.cmd_dumpcap, '-f', invalid_filter, '-w', testout_file ))
         self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
 
     def test_dumpcap_invalid_interface_name(self):
@@ -60,7 +63,8 @@ class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
             self.skipTest('Test requires capture privileges and an interface.')
         invalid_interface = '__invalid_interface'
         # $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
-        self.runProcess((config.cmd_dumpcap, '-i', invalid_interface, '-w', 'testout.pcap'))
+        testout_file = self.filename_from_id(testout_pcap)
+        self.runProcess((config.cmd_dumpcap, '-i', invalid_interface, '-w', testout_file))
         self.assertTrue(self.grepOutput('The capture session could not be initiated'))
 
     def test_dumpcap_invalid_interface_index(self):
@@ -69,7 +73,8 @@ class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
             self.skipTest('Test requires capture privileges and an interface.')
         invalid_index = '0'
         # $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1
-        self.runProcess((config.cmd_dumpcap, '-i', invalid_index, '-w', 'testout.pcap'))
+        testout_file = self.filename_from_id(testout_pcap)
+        self.runProcess((config.cmd_dumpcap, '-i', invalid_index, '-w', testout_file))
         self.assertTrue(self.grepOutput('There is no interface with that adapter index'))
 
 
@@ -119,7 +124,8 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
             self.skipTest('Test requires capture privileges and an interface.')
         invalid_filter = '__invalid_protocol'
         # $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
-        self.runProcess((config.cmd_tshark, '-f', invalid_filter, '-w', 'testout.pcap' ))
+        testout_file = self.filename_from_id(testout_pcap)
+        self.runProcess((config.cmd_tshark, '-f', invalid_filter, '-w', testout_file ))
         self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
 
     def test_tshark_invalid_interface_name(self):
@@ -128,7 +134,8 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
             self.skipTest('Test requires capture privileges and an interface.')
         invalid_interface = '__invalid_interface'
         # $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
-        self.runProcess((config.cmd_tshark, '-i', invalid_interface, '-w', 'testout.pcap'))
+        testout_file = self.filename_from_id(testout_pcap)
+        self.runProcess((config.cmd_tshark, '-i', invalid_interface, '-w', testout_file))
         self.assertTrue(self.grepOutput('The capture session could not be initiated'))
 
     def test_tshark_invalid_interface_index(self):
@@ -137,7 +144,8 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
             self.skipTest('Test requires capture privileges and an interface.')
         invalid_index = '0'
         # $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1
-        self.runProcess((config.cmd_tshark, '-i', invalid_index, '-w', 'testout.pcap'))
+        testout_file = self.filename_from_id(testout_pcap)
+        self.runProcess((config.cmd_tshark, '-i', invalid_index, '-w', testout_file))
         self.assertTrue(self.grepOutput('There is no interface with that adapter index'))
 
 
index 5b0bc77b838b7e6820936bfcea945c3a9d3f8c3e..e5092a19c527affb401025a27d89ac7fd24697bb 100644 (file)
@@ -1,4 +1,5 @@
 #
+# -*- coding: utf-8 -*-
 # Wireshark tests
 # By Gerald Combs <gerald@wireshark.org>
 #
index cfdab9253be8a5d1cca7d0e74ae765c3a380afd0..b81d048b536de3d356d3a23b009e47f97a389aad 100644 (file)
@@ -1,4 +1,5 @@
 #
+# -*- coding: utf-8 -*-
 # Wireshark tests
 # By Gerald Combs <gerald@wireshark.org>
 #
diff --git a/test/suite_fileformats.py b/test/suite_fileformats.py
new file mode 100644 (file)
index 0000000..6ebf88d
--- /dev/null
@@ -0,0 +1,112 @@
+#
+# -*- coding: utf-8 -*-
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''File format conversion tests'''
+
+import config
+import io
+import os.path
+import subprocesstest
+import sys
+import unittest
+
+# XXX Currently unused. It would be nice to be able to use this below.
+time_output_args = ('-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta')
+
+# Microsecond pcap, direct read was used to generate the baseline:
+# tshark -Tfields -e frame.number -e frame.time_epoch -e frame.time_delta \
+#   -r captures/dhcp.pcap > baseline/ff-ts-usec-pcap-direct.txt
+baseline_file = 'ff-ts-usec-pcap-direct.txt'
+baseline_fd = io.open(os.path.join(config.baseline_dir, baseline_file), 'r', encoding='UTF-8', errors='replace')
+baseline_str = baseline_fd.read()
+baseline_fd.close()
+
+class case_fileformat_pcap(subprocesstest.SubprocessTestCase):
+    def test_pcap_usec_stdin(self):
+        '''Microsecond pcap direct vs microsecond pcap stdin'''
+        capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+        capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+                '-r', '-',
+                '-Tfields',
+                '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
+                '<', capture_file
+                , shell=True),
+            shell=True)
+        self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+    def test_pcap_nsec_stdin(self):
+        '''Microsecond pcap direct vs nanosecond pcap stdin'''
+        capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcap')
+        capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+                '-r', '-',
+                '-Tfields',
+                '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
+                '<', capture_file
+                , shell=True),
+            shell=True)
+        self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+    def test_pcap_nsec_direct(self):
+        '''Microsecond pcap direct vs nanosecond pcap direct'''
+        capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcap')
+        capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
+                ),
+            )
+        self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+class case_fileformat_pcapng(subprocesstest.SubprocessTestCase):
+    def test_pcapng_usec_stdin(self):
+        '''Microsecond pcap direct vs microsecond pcapng stdin'''
+        capture_file = os.path.join(config.capture_dir, 'dhcp.pcapng')
+        capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+                '-r', '-',
+                '-Tfields',
+                '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
+                '<', capture_file
+                , shell=True),
+            shell=True)
+        self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+    def test_pcapng_usec_direct(self):
+        '''Microsecond pcap direct vs microsecond pcapng direct'''
+        capture_file = os.path.join(config.capture_dir, 'dhcp.pcapng')
+        capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
+                ),
+            )
+        self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+    def test_pcapng_nsec_stdin(self):
+        '''Microsecond pcap direct vs nanosecond pcapng stdin'''
+        capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcapng')
+        capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+                '-r', '-',
+                '-Tfields',
+                '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
+                '<', capture_file
+                , shell=True),
+            shell=True)
+        self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+    def test_pcapng_nsec_direct(self):
+        '''Microsecond pcap direct vs nanosecond pcapng direct'''
+        capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcapng')
+        capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
+                ),
+            )
+        self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
diff --git a/test/suite_io.py b/test/suite_io.py
new file mode 100644 (file)
index 0000000..c8591ce
--- /dev/null
@@ -0,0 +1,89 @@
+#
+# -*- coding: utf-8 -*-
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''File I/O tests'''
+
+import config
+import io
+import os.path
+import subprocesstest
+import sys
+import unittest
+
+testout_pcap = 'testout.pcap'
+baseline_file = 'io-rawshark-dhcp-pcap.txt'
+baseline_fd = io.open(os.path.join(config.baseline_dir, baseline_file), 'r', encoding='UTF-8', errors='replace')
+baseline_str = baseline_fd.read()
+baseline_fd.close()
+
+def check_io_4_packets(self, cmd=None, from_stdin=False, to_stdout=False):
+    # Test direct->direct, stdin->direct, and direct->stdout file I/O.
+    # Similar to suite_capture.check_capture_10_packets and
+    # suite_capture.check_capture_stdin.
+    if cmd == config.cmd_wireshark and not config.canDisplay():
+        self.skipTest('Test requires a display.')
+    self.assertIsNotNone(cmd)
+    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+    testout_file = self.filename_from_id(testout_pcap)
+    if from_stdin and to_stdout:
+        # XXX If we support this, should we bother with separate stdin->direct
+        # and direct->stdout tests?
+        self.fail('Stdin and stdout not supported in the same test.')
+    elif from_stdin:
+        # cat -B "${CAPTURE_DIR}dhcp.pcap" | $DUT -r - -w ./testout.pcap 2>./testout.txt
+        cat_dhcp_cmd = subprocesstest.cat_dhcp_command('cat')
+        stdin_cmd = '{0} | "{1}" -r - -w "{2}"'.format(cat_dhcp_cmd, cmd, testout_file)
+        io_proc = self.runProcess(stdin_cmd, shell=True)
+    elif to_stdout:
+        # $DUT -r "${CAPTURE_DIR}dhcp.pcap" -w - > ./testout.pcap 2>./testout.txt
+        stdout_cmd = '"{0}" -r "{1}" -w - > "{2}"'.format(cmd, capture_file, testout_file)
+        io_proc = self.runProcess(stdout_cmd, shell=True)
+    else: # direct->direct
+        # $DUT -r "${CAPTURE_DIR}dhcp.pcap" -w ./testout.pcap > ./testout.txt 2>&1
+        capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+        io_proc = self.runProcess(subprocesstest.capture_command(cmd,
+            '-r', capture_file,
+            '-w', testout_file,
+        ))
+    io_returncode = io_proc.returncode
+    self.assertEqual(io_returncode, 0)
+    self.assertTrue(os.path.isfile(testout_file))
+    if (io_returncode == 0):
+        self.checkPacketCount(4)
+
+class case_tshark_io(subprocesstest.SubprocessTestCase):
+    def test_tshark_io_stdin_direct(self):
+        '''Read from stdin and write direct using TShark'''
+        check_io_4_packets(self, cmd=config.cmd_tshark, from_stdin=True)
+
+    def test_tshark_io_direct_stdout(self):
+        '''Read direct and write to stdout using TShark'''
+        check_io_4_packets(self, cmd=config.cmd_tshark, to_stdout=True)
+
+    def test_tshark_io_direct_direct(self):
+        '''Read direct and write direct using TShark'''
+        check_io_4_packets(self, cmd=config.cmd_tshark)
+
+# The Bash version didn't test Wireshark or dumpcap
+
+class case_rawshark_io(subprocesstest.SubprocessTestCase):
+    @unittest.skipUnless(sys.byteorder == 'little', 'Requires a little endian system')
+    def test_rawshark_io_stdin(self):
+        '''Read from stdin using Rawshark'''
+        # tail -c +25 "${CAPTURE_DIR}dhcp.pcap" | $RAWSHARK -dencap:1 -R "udp.port==68" -nr - > $IO_RAWSHARK_DHCP_PCAP_TESTOUT 2> /dev/null
+        # diff -u --strip-trailing-cr $IO_RAWSHARK_DHCP_PCAP_BASELINE $IO_RAWSHARK_DHCP_PCAP_TESTOUT > $DIFF_OUT 2>&1
+        capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+        testout_file = self.filename_from_id(testout_pcap)
+        raw_dhcp_cmd = subprocesstest.cat_dhcp_command('raw')
+        rawshark_cmd = '{0} | "{1}" -r - -n -dencap:1 -R "udp.port==68"'.format(raw_dhcp_cmd, config.cmd_rawshark)
+        rawshark_proc = self.runProcess(rawshark_cmd, shell=True)
+        rawshark_returncode = rawshark_proc.returncode
+        self.assertEqual(rawshark_returncode, 0)
+        if (rawshark_returncode == 0):
+            self.assertTrue(self.diffOutput(rawshark_proc.stdout_str, baseline_str, 'rawshark', baseline_file))
index f53d90abaa625dd5d3d2cec15b8a0ec4947af8ea..26e1a75239141eb8b991657fecb13374f0129244 100755 (executable)
@@ -1,4 +1,5 @@
 #!/usr/bin/env python
+# -*- coding: utf-8 -*-
 #
 # Wireshark tests
 # By Gerald Combs <gerald@wireshark.org>
diff --git a/test/util_dump_dhcp_pcap.py b/test/util_dump_dhcp_pcap.py
new file mode 100755 (executable)
index 0000000..2fbd7fa
--- /dev/null
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Write captures/dhcp.pcap to stdout, optionally writing only packet records or writing them slowly.'''
+
+import argparse
+import os
+import os.path
+import time
+import sys
+
+def main():
+    parser = argparse.ArgumentParser(description='Dump dhcp.pcap')
+    parser.add_argument('dump_type', choices=['cat', 'slow', 'raw'],
+        help='cat: Just dump the file. slow: Dump the file, pause, and dump its packet records. raw: Dump only the packet records.')
+    args = parser.parse_args()
+
+    if sys.version_info[0] < 3 and sys.platform == "win32":
+        import msvcrt
+        msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
+
+    dhcp_pcap = os.path.join(os.path.dirname(__file__), 'captures', 'dhcp.pcap')
+
+    dhcp_fd = open(dhcp_pcap, 'rb')
+    contents = dhcp_fd.read()
+    if args.dump_type != 'raw':
+        os.write(1, contents)
+    if args.dump_type == 'cat':
+        sys.exit(0)
+    if args.dump_type == 'slow':
+        time.sleep(1.5)
+    # slow, raw
+    os.write(1, contents[24:])
+    sys.exit(0)
+
+if __name__ == '__main__':
+    main()
+
diff --git a/test/util_slow_dhcp_pcap.py b/test/util_slow_dhcp_pcap.py
deleted file mode 100644 (file)
index c0a34e6..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/usr/bin/env python
-#
-# Wireshark tests
-# By Gerald Combs <gerald@wireshark.org>
-#
-# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
-#
-# SPDX-License-Identifier: GPL-2.0-or-later
-#
-'''Write captures/dhcp.pcap to stdout, pause 1.5 seconds, and write it again.'''
-
-import os
-import os.path
-import time
-import sys
-
-if sys.version_info[0] < 3 and sys.platform == "win32":
-    import msvcrt
-    msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
-
-dhcp_pcap = os.path.join(os.path.dirname(__file__), 'captures', 'dhcp.pcap')
-
-dhcp_fd = open(dhcp_pcap, 'rb')
-contents = dhcp_fd.read()
-os.write(1, contents)
-time.sleep(1.5)
-os.write(1, contents[24:])