Start porting our test scripts to Python. Add ctest support.
authorGerald Combs <gerald@wireshark.org>
Tue, 3 Apr 2018 00:12:23 +0000 (17:12 -0700)
committerGerald Combs <gerald@wireshark.org>
Thu, 26 Apr 2018 19:27:19 +0000 (19:27 +0000)
Create Python versions of our various test shell scripts. Add CMake
tests for each suite. Tests can now be run directly via test.py, via the
"test" target, or via ctest, e.g.

  ctest --verbose --jobs 3

Add a testing chapter to the Developer's Guide.

Add a way to disable ctest in dpkg-buildpackage.

Suites completed:
- capture
- clopts
- decryption
- dissection

Remaining suites:
- fileformats
- io
- mergecap
- nameres
- text2pcap
- unittests
- wslua

Change-Id: I8936e05edefc76a86b6a7a5da302e7461bbdda0f
Reviewed-on: https://code.wireshark.org/review/27134
Petri-Dish: Gerald Combs <gerald@wireshark.org>
Tested-by: Petri Dish Buildbot
Reviewed-by: Peter Wu <peter@lekensteyn.nl>
Reviewed-by: Gerald Combs <gerald@wireshark.org>
14 files changed:
CMakeLists.txt
debian/rules
docbook/developer-guide.asciidoc
docbook/wsdg_src/WSDG_chapter_tests.asciidoc [new file with mode: 0644]
test/README.test
test/config.py [new file with mode: 0644]
test/subprocesstest.py [new file with mode: 0644]
test/suite-decryption.sh
test/suite_capture.py [new file with mode: 0644]
test/suite_clopts.py [new file with mode: 0644]
test/suite_decryption.py [new file with mode: 0644]
test/suite_dissection.py [new file with mode: 0644]
test/test.py [new file with mode: 0755]
test/util_slow_dhcp_pcap.py [new file with mode: 0644]

index 6f57c882c726d1845abb808c85f316aef53433c4..1da14aae24ca86813ba621020e056257aafaa508 100644 (file)
@@ -3134,7 +3134,7 @@ if (DOXYGEN_EXECUTABLE)
        )
 endif(DOXYGEN_EXECUTABLE)
 
-# Test suite wrapper
+# Old test suite wrapper
 if(ENABLE_APPLICATION_BUNDLE)
        set(TEST_SH_BIN_DIR ${CMAKE_BINARY_DIR}/run)
 else()
@@ -3168,6 +3168,38 @@ set_target_properties(test-programs PROPERTIES
        EXCLUDE_FROM_DEFAULT_BUILD True
 )
 
+# Test suites
+enable_testing()
+file(GLOB _test_suite_py_list test/suite_*.py)
+if(WIN32)
+       set(_test_suite_program_path ./run/$<CONFIG>)
+else()
+       set(_test_suite_program_path ./run)
+endif()
+
+# We currently don't handle spaces in arguments. On Windows this
+# means that you will probably have to pass in an interface index
+# instead of a name.
+set(TEST_EXTRA_ARGS "" CACHE STRING "Extra arguments to pass to test/test.py")
+separate_arguments(TEST_EXTRA_ARGS)
+
+# We can enumerate suites two ways: by probing the filesystem and by
+# running `test.py --list-suites`. Probe the filesystem for now, which
+# should hopefully give us enough parallelization. If we want to split
+# our tests by cases or individual tests we'll have to run and parse
+# `test.py --list-cases` or `test.py --list` respectively.
+foreach(_suite_py ${_test_suite_py_list})
+       get_filename_component(_suite_name ${_suite_py} NAME_WE)
+       add_test(
+               NAME ${_suite_name}
+               COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/test/test.py
+                       --program-path ${_test_suite_program_path}
+                       ${TEST_EXTRA_ARGS}
+                       ${_suite_name}
+               )
+       set_tests_properties(${_suite_name} PROPERTIES TIMEOUT 600)
+endforeach()
+
 if (GIT_EXECUTABLE)
        # Update AUTHORS file with entries from git shortlog
        add_custom_target(
index 3d1c3eaed2e75b6b8bfaa561354d315f3882431b..ced9cb339ddc6f381e165ee6d55328c1f44d8ed3 100755 (executable)
@@ -61,3 +61,11 @@ override_dh_fixperms:
                debian/wireshark-dev/usr/share/pyshared/wireshark_be.py \
                debian/wireshark-dev/usr/share/pyshared/wireshark_gen.py
 
+# Adapted from https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=861988
+override_dh_auto_test:
+ifeq ($(filter $(DEB_BUILD_OPTIONS),nocheck),)
+       # XXX Add -- --verbose?
+       dh_auto_test
+else
+       @echo '"DEB_BUILD_OPTIONS" has "nocheck". Skipping tests'
+endif
index 468e42874c8aa673a8675c9deec528115e3f6f8c..c89bd0c606d7762015ee63072ab7c0b2d2c23774 100644 (file)
@@ -56,4 +56,6 @@ include::wsluarm.asciidoc[]
 
 include::wsdg_src/WSDG_chapter_userinterface.asciidoc[]
 
+include::wsdg_src/WSDG_chapter_tests.asciidoc[]
+
 include::common_src/GPL_appendix.asciidoc[]
diff --git a/docbook/wsdg_src/WSDG_chapter_tests.asciidoc b/docbook/wsdg_src/WSDG_chapter_tests.asciidoc
new file mode 100644 (file)
index 0000000..249e5a4
--- /dev/null
@@ -0,0 +1,170 @@
+// WSDG Chapter Setup
+
+[[ChapterTests]]
+== Wireshark Tests
+
+The Wireshark sources include a collection of Python scripts that test
+the features of Wireshark, TShark, Dumpcap, and other programs that
+accompany Wireshark.
+
+The command line options of Wireshark and its companion command line
+tools are numerous. These tests help to ensure that we don't introduce
+bugs as Wireshark grows and evolves.
+
+=== Quick Start
+
+The main testing script is `test.py`. It will attempt to test as much as
+possible by default, including packet capture. This means that you will
+probably either have to supply a capture interface (`--capture-interface
+<interface>`) or disable capture tests (`--disable-capture`).
+
+To run all tests from CMake do the following:
+* Pass `-DTEST_EXTRA_ARGS=--disable-capture` or
+  `-DTEST_EXTRA_ARGS=--capture-interface=<interface>`
+  as needed for your system.
+* Build the “test” target or run ctest, e.g. `ctest --jobs=4 --verbose`.
+
+To run all tests directly, run `test.py -p
+/path/to/wireshark-build/run-directory <capture args>`.
+
+To see a list of all options, run `test.py -h` or `test.py --help`.
+
+To see a list of all tests, run `test.py -l`.
+
+=== Test Coverage And Availability
+
+The testing framework can run programs and check their stdout, stderr,
+and exit codes. It cannot interact with the Wireshark UI. Tests cover
+capture, command line options, decryption, file format support and
+conversion, Lua scripting, and other functionality.
+
+Available tests depend on the libraries with which Wireshark was built.
+For example, some decryption tests depend on a minimum version of
+Libgcrypt and Lua tests depend on Lua.
+
+Capture tests depend on the permissions of the user running the test
+script. We assume that the test user has capture permissions on Windows
+and macOS and capture tests are enabled by default on those platforms.
+
+=== Suites, Cases, and Tests
+
+The `test.py` script uses Python's “unittest” module. Our tests are
+patterned after it, and individual tests are organized according to
+suites, cases, and individual tests. Suites correspond to python modules
+that match the pattern “suite_*.py”. Cases correspond to one or more
+classes in each module, and case class methods matching the pattern
+”test_*” correspond to individual tests. For example, the invalid
+capture filter test in the TShark capture command line options test case
+in the command line options suite has the ID
+“suite_clopts.case_tshark_capture_clopts.test_tshark_invalid_capfilter”.
+
+=== Listing And Running Tests
+
+Tests can be run via the `test.py` Python script. To run all tests,
+either run `test.py` in the directory that contains the Wireshark
+executables (`wireshark`, `tshark`, etc.), or pass the the executable
+path via the `-p` flag:
+
+[source,sh]
+----
+$ python test.py -p /path/to/wireshark-build/run
+----
+
+You can list tests by passing one or more complete or partial names to
+`tshark.py`. The `-l` flag lists tests. By default all tests are shown.
+
+[source,sh]
+----
+# List all tests
+$ python test.py -l
+$ python test.py -l all
+$ python test.py --list
+$ python test.py --list all
+
+# List only tests containing "dumpcap"
+$ python test.py -l dumpcap
+
+# List all suites
+$ python test.py --list-suites
+
+# List all suites and cases
+$ python test.py --list-cases
+----
+
+If one of the listing flags is not present, tests are run. If no names or `all` is supplied,
+all tests are run. Otherwise tests that match are run.
+
+[source,sh]
+----
+# Run all tests
+$ python test.py
+$ python test.py all
+
+# Only run tests containing "dumpcap"
+$ python test.py -l dumpcap
+
+# Run the "clopts" suite
+$ python test.py suite_clopts
+----
+
+=== Adding Or Modifying Tests
+
+Tests must be in a Python module whose name matches “suite_*.py”. The
+module must contain one or more subclasses of “SubprocessTestCase” or
+“unittest.TestCase”. “SubprocessTestCase” is recommended since it
+contains several convenience methods for running processes, checking
+output, and displaying error information. Each test case method
+whose name starts with “test_” constitutes an individual test.
+
+Success or failure conditions can be signalled using the
+“unittest.assertXXX()” or “subprocesstest.assertXXX()” methods.
+
+The “config” module contains common configuration information which has
+been derived from the current environment or specified on the command
+line.
+
+The “subprocesstest” class contains the following methods for running
+processes. Stdout and stderr is written to “<test id>.log”:
+
+startProcess:: Start a process without waiting for it to finish.
+runProcess:: Start a process and wait for it to finish.
+assertRun:: Start a process, wait for it to finish, and check its exit code.
+
+All of the current tests run one or more of Wireshark's suite of
+executables and either checks their return code or their output. A
+simple example is “suite_clopts.case_basic_clopts.test_existing_file”,
+which reads a capture file using TShark and checks its exit code.
+
+[source,python]
+----
+import config
+import subprocesstest
+
+class case_basic_clopts(subprocesstest.SubprocessTestCase):
+    def test_existing_file(self):
+        cap_file = os.path.join(self.capture_dir, 'dhcp.pcap')
+        self.assertRun((config.cmd_tshark, '-r', cap_file))
+----
+
+Program output can be checked using “subprocesstest.grepOutput”
+or “subprocesstest.countOutput”:
+
+[source,python]
+----
+import config
+import subprocesstest
+
+class case_decrypt_80211(subprocesstest.SubprocessTestCase):
+    def test_80211_wpa_psk(self):
+        capture_file = os.path.join(config.capture_dir, 'wpa-Induction.pcap.gz')
+        self.runProcess((config.cmd_tshark,
+                '-o', 'wlan.enable_decryption: TRUE',
+                '-Tfields',
+                '-e', 'http.request.uri',
+                '-r', capture_file,
+                '-Y', 'http',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('favicon.ico'))
+----
+
index 068d7d881648810b75fd539ca200871f1d3f1a27..efc8f1ea1b52528ab888029ae5e92785e2aad7ff 100644 (file)
@@ -1,79 +1,21 @@
-What is it?
------------
-This is a collection of bash scripts which test the features of:
+Wireshark Tests
 
- - Wireshark
- - TShark
- - Dumpcap
+The main testing script is `test.py`. It will attempt to test as much as
+possible by default, including packet capture. This means that you will
+probably either have to supply a capture interface (`--capture-interface
+<interface>`) or disable capture tests (`--disable-capture`).
 
-Motivation
-----------
+To run all tests from CMake do the following:
+- Pass `-DTEST_EXTRA_ARGS=--disable-capture` or
+  `-DTEST_EXTRA_ARGS=--capture-interface=<interface>`
+  as needed for your system.
+- Build the “test” target or run ctest, e.g. `ctest --jobs=4 --verbose`.
 
-The command line options of Wireshark and the companion command line tools are
-numerous. This makes it hard to find newly introduced bugs doing manual testing
-(try and error) with source code changes.
+To run all tests directly, run `test.py -p
+/path/to/wireshark-build/run-directory <capture args>`.
 
-The current way is to do some changes, testing some scenarios by hand and
-commit the code so other users will complain about new problems. This obviously
-is far from being optimal.
+To see a list of all options, run `test.py -h` or `test.py --help`.
 
-Limitations
------------
+To see a list of all tests, run `test.py -l`.
 
-The test set currently provided will only do some basic tests, but even that
-is far better than nothing. This may involve in time as new tests can be added
-to fix problems reported by users. This will hopefully lead to a "complete"
-and reliable testset in the future.
-
-The tests are limited to command line tests, other things like unit tests or
-GUI test are not included.
-
-Prerequisites
--------------
-
-What you'll need (to do):
-
- - edit the file config.sh to suit your configuration
- - build the "all" target
- - build the "test-programs" target
- - have a bash (cygwin should do well)
- - have tput (e.g. in the cygwin ncurses package)
- - you'll need a network interface with some network traffic
-   (so you can run the capture tests)
- - (for non-Windows platforms) An X server for running the capture tests with
-   the graphical Wireshark program.
-
-A Test Ride
------------
-
-The default configuration might not be suitable for your set-up. Most settings
-can be adjusted by setting an environment variable matching or by editing the
-setting in config.sh.
-
-For instance, the first network interface might not be used for traffic (like an
-unconnected Ethernet port). In that case, you might want to set the environment
-variable TRAFFIC_CAPTURE_IFACE to pick another interface. Use `dumpcap -D` to
-get a list of devices.
-
-On Windows, it is assumed that the user is able to perform captures. On
-non-Windows platforms, the opposite is assumed. If your dumpcap executable
-allows you to perform captures (for example, when it has appropriate
-capabilities), then you can override the default with:
-
-    SKIP_CAPTURE=0
-
-If you do not want to test the binaries in the build directory, you can override
-it with:
-
-    WS_BIN_PATH=/usr/bin
-
-When your configuration is sane, you can start test.sh which should provide a
-basic menu. Just press Enter to start all tests.
-
-It should start all the available tests. Each test will throw out a line
-which should end with a green "Ok". If one of the tests fail, the script
-will report it and stop at this test step.
-
-Please remember to have some ICMP traffic on your network interface! The test
-suite will ping to www.wireshark.org while running capture tests, but this will
-slow down the tests.
+See the “Wireshark Tests” chapter of the Developer's Guide for details.
diff --git a/test/config.py b/test/config.py
new file mode 100644 (file)
index 0000000..78c2bff
--- /dev/null
@@ -0,0 +1,186 @@
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Configuration'''
+
+import os
+import os.path
+import re
+import subprocess
+import sys
+import tempfile
+
+commands = (
+    'dumpcap',
+    'tshark',
+    'wireshark',
+    'capinfos',
+)
+
+can_capture = False
+capture_interface = None
+
+# Our executables
+# Strings
+cmd_tshark = None
+cmd_dumpcap = None
+cmd_wireshark = None
+cmd_capinfos = None
+# Arrays
+args_ping = None
+
+have_lua = False
+have_nghttp2 = False
+have_kerberos = False
+have_libgcrypt17 = False
+
+test_env = None
+home_path = None
+conf_path = None
+this_dir = os.path.dirname(__file__)
+baseline_dir = os.path.join(this_dir, 'baseline')
+capture_dir = os.path.join(this_dir, 'captures')
+config_dir = os.path.join(this_dir, 'config')
+key_dir = os.path.join(this_dir, 'keys')
+lua_dir = os.path.join(this_dir, 'lua')
+
+def canCapture():
+    return can_capture and capture_interface is not None
+
+def setCanCapture(new_cc):
+    can_capture = new_cc
+
+def setCaptureInterface(iface):
+    global capture_interface
+    capture_interface = iface
+
+def canMkfifo():
+    return not sys.platform.startswith('win32')
+
+def canDisplay():
+    if sys.platform.startswith('win32') or sys.platform.startswith('darwin'):
+        return True
+    # Qt requires XKEYBOARD and Xrender, which Xvnc doesn't provide.
+    return False
+
+def getTsharkInfo():
+    global have_lua
+    global have_nghttp2
+    global have_kerberos
+    global have_libgcrypt17
+    have_lua = False
+    have_nghttp2 = False
+    have_kerberos = False
+    have_libgcrypt17 = False
+    try:
+        tshark_v_blob = str(subprocess.check_output((cmd_tshark, '--version'), stderr=subprocess.PIPE))
+        tshark_v = ' '.join(tshark_v_blob.splitlines())
+        if re.search('with +Lua', tshark_v):
+            have_lua = True
+        if re.search('with +nghttp2', tshark_v):
+            have_nghttp2 = True
+        if re.search('(with +MIT +Kerberos|with +Heimdal +Kerberos)', tshark_v):
+            have_kerberos = True
+        gcry_m = re.search('with +Gcrypt +([0-9]+\.[0-9]+)', tshark_v)
+        have_libgcrypt = gcry_m and float(gcry_m.group(1)) >= 1.7
+    except:
+        pass
+
+def getDefaultCaptureInterface():
+    '''Choose a default capture interface for our platform. Currently Windows only.'''
+    global capture_interface
+    if capture_interface:
+        return
+    if cmd_dumpcap is None:
+        return
+    if not sys.platform.startswith('win32'):
+        return
+    try:
+        dumpcap_d = subprocess.check_output((cmd_dumpcap, '-D'), stderr=subprocess.PIPE)
+        for d_line in dumpcap_d.splitlines():
+            iface_m = re.search('(\d+)\..*(Ethernet|Network Connection|VMware|Intel)', d_line)
+            if iface_m:
+                capture_interface = iface_m.group(1)
+                break
+    except:
+        pass
+
+def getPingCommand():
+    '''Return an argument list required to ping www.wireshark.org for 60 seconds.'''
+    global args_ping
+    # XXX The shell script tests swept over packet sizes from 1 to 240 every 0.25 seconds.
+    if sys.platform.startswith('win32'):
+        # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping
+        args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org')
+    elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
+        args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org')
+    elif sys.platform.startswith('darwin'):
+        args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org')
+    # XXX Other BSDs, Solaris, etc
+
+def setProgramPath(path):
+    global program_path
+    program_path = path
+    retval = True
+    dotexe = ''
+    if sys.platform.startswith('win32'):
+        dotexe = '.exe'
+    for cmd in commands:
+        cmd_var = 'cmd_' + cmd
+        cmd_path = os.path.join(path, cmd + dotexe)
+        if not os.path.exists(cmd_path) or not os.access(cmd_path, os.X_OK):
+            cmd_path = None
+            retval = False
+        globals()[cmd_var] = cmd_path
+    getTsharkInfo()
+    getDefaultCaptureInterface()
+    return retval
+
+def testEnvironment():
+    return test_env
+
+def setUpTestEnvironment():
+    global home_path
+    global conf_path
+    global test_env
+    test_confdir = tempfile.mkdtemp(prefix='wireshark-tests.')
+    home_path = os.path.join(test_confdir, 'home')
+    if sys.platform.startswith('win32'):
+        home_env = 'APPDATA'
+        conf_path = os.path.join(home_path, 'Wireshark')
+    else:
+        home_env = 'HOME'
+        conf_path = os.path.join(home_path, '.config', 'wireshark')
+    os.makedirs(conf_path)
+    test_env = os.environ.copy()
+    test_env[home_env] = home_path
+
+def setUpConfigFile(conf_file):
+    global home_path
+    global conf_path
+    if home_path is None or conf_path is None:
+        setUpTestEnvironment()
+    template = os.path.join(os.path.dirname(__file__), 'config', conf_file) + '.tmpl'
+    with open(template, 'r') as tplt_fd:
+        tplt_contents = tplt_fd.read()
+        tplt_fd.close()
+        key_dir_path = os.path.join(key_dir, '')
+        # uat.c replaces backslashes...
+        key_dir_path = key_dir_path.replace('\\', '\\x5c')
+        cf_contents = tplt_contents.replace('TEST_KEYS_DIR', key_dir_path)
+    out_file = os.path.join(conf_path, conf_file)
+    with open(out_file, 'w') as cf_fd:
+        cf_fd.write(cf_contents)
+        cf_fd.close()
+
+if sys.platform.startswith('win32') or sys.platform.startswith('darwin'):
+    can_capture = True
+
+# Initialize ourself.
+getPingCommand()
+setProgramPath(os.path.curdir)
diff --git a/test/subprocesstest.py b/test/subprocesstest.py
new file mode 100644 (file)
index 0000000..18313db
--- /dev/null
@@ -0,0 +1,206 @@
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Subprocess test case superclass'''
+
+import io
+import os
+import os.path
+import re
+import subprocess
+import sys
+import unittest
+
+# To do:
+# - Add a subprocesstest.SkipUnlessCapture decorator?
+# - Try to catch crashes? See the comments below in waitProcess.
+
+# XXX This should probably be in config.py and settable from
+# the command line.
+if sys.version_info[0] >= 3:
+    process_timeout = 300 # Seconds
+
+class LoggingPopen(subprocess.Popen):
+    '''Run a process using subprocess.Popen. Capture and log its output.
+
+    Stdout and stderr are captured to memory and decoded as UTF-8. The
+    program command and output is written to log_fd.
+    '''
+    def __init__(self, proc_args, *args, **kwargs):
+        self.log_fd = kwargs.pop('log_fd', None)
+        kwargs['stdout'] = subprocess.PIPE
+        kwargs['stderr'] = subprocess.PIPE
+        # Make sure communicate() gives us bytes.
+        kwargs['universal_newlines'] = False
+        self.cmd_str = 'command ' + repr(proc_args)
+        super(LoggingPopen, self).__init__(proc_args, *args, **kwargs)
+        self.stdout_str = ''
+        self.stderr_str = ''
+
+    def wait_and_log(self):
+        '''Wait for the process to finish and log its output.'''
+        # Wherein we navigate the Python 2 and 3 Unicode compatibility maze.
+        if sys.version_info[0] >= 3:
+            out_data, err_data = self.communicate(timeout=process_timeout)
+            out_log = out_data.decode('UTF-8', 'replace')
+            err_log = err_data.decode('UTF-8', 'replace')
+        else:
+            out_data, err_data = self.communicate()
+            out_log = unicode(out_data, 'UTF-8', 'replace')
+            err_log = unicode(err_data, 'UTF-8', 'replace')
+        # Throwing a UnicodeDecodeError exception here is arguably a good thing.
+        self.stdout_str = out_data.decode('UTF-8', 'strict')
+        self.stderr_str = err_data.decode('UTF-8', 'strict')
+        self.log_fd.flush()
+        self.log_fd.write(u'-- Begin stdout for {} --\n'.format(self.cmd_str))
+        self.log_fd.write(out_log)
+        self.log_fd.write(u'-- End stdout for {} --\n'.format(self.cmd_str))
+        self.log_fd.write(u'-- Begin stderr for {} --\n'.format(self.cmd_str))
+        self.log_fd.write(err_log)
+        self.log_fd.write(u'-- End stderr for {} --\n'.format(self.cmd_str))
+        self.log_fd.flush()
+
+    def stop_process(self, kill=False):
+        '''Stop the process immediately.'''
+        if kill:
+            super(LoggingPopen, self).kill()
+        else:
+            super(LoggingPopen, self).terminate()
+
+    def terminate(self):
+        '''Terminate the process. Do not log its output.'''
+        # XXX Currently unused.
+        self.stop_process(kill=False)
+
+    def kill(self):
+        '''Kill the process. Do not log its output.'''
+        self.stop_process(kill=True)
+
+class SubprocessTestCase(unittest.TestCase):
+    '''Run a program and gather its stdout and stderr.'''
+
+    def __init__(self, *args, **kwargs):
+        super(SubprocessTestCase, self).__init__(*args, **kwargs)
+        self.exit_ok = 0
+        self.exit_command_line = 1
+        self.exit_error = 2
+        self.exit_code = None
+        self.log_fname = None
+        self.log_fd = None
+        self.processes = []
+        self.cleanup_files = []
+        self.dump_files = []
+
+    def log_fd_write_bytes(self, log_data):
+        if sys.version_info[0] >= 3:
+            self.log_fd.write(log_data)
+        else:
+            self.log_fd.write(unicode(log_data, 'UTF-8', 'replace'))
+
+    def filename_from_id(self, filename):
+        '''Generate a filename prefixed with our test ID.'''
+        return self.id() + '.' + filename
+
+    def kill_processes(self):
+        '''Kill any processes we've opened so far'''
+        for proc in self.processes:
+            try:
+                proc.kill()
+            except:
+                pass
+
+    def run(self, result=None):
+        # Subclass run() so that we can do the following:
+        # - Open our log file and add it to the cleanup list.
+        # - Check our result before and after the run so that we can tell
+        #   if the current test was successful.
+
+        # Probably not needed, but shouldn't hurt.
+        self.kill_processes()
+        self.processes = []
+        self.log_fname = self.filename_from_id('log')
+        # Our command line utilities generate UTF-8. The log file endcoding
+        # needs to match that.
+        self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8')
+        self.cleanup_files.append(self.log_fname)
+        pre_run_problem_count = 0
+        if result:
+            pre_run_problem_count = len(result.failures) + len(result.errors)
+        try:
+            super(SubprocessTestCase, self).run(result=result)
+        except KeyboardInterrupt:
+            # XXX This doesn't seem to work on Windows, which is where we need it the most.
+            self.kill_processes()
+
+        # Tear down our test. We don't do this in tearDown() because Python 3
+        # updates "result" after calling tearDown().
+        self.kill_processes()
+        self.log_fd.close()
+        if result:
+            post_run_problem_count = len(result.failures) + len(result.errors)
+            if pre_run_problem_count != post_run_problem_count:
+                self.dump_files.append(self.log_fname)
+                # Leave some evidence behind.
+                self.cleanup_files = []
+                print('\nProcess output for {}:'.format(self.id()))
+                with io.open(self.log_fname, 'r', encoding='UTF-8') as log_fd:
+                    for line in log_fd:
+                        sys.stdout.write(line)
+        for filename in self.cleanup_files:
+            try:
+                os.unlink(filename)
+            except OSError:
+                pass
+        self.cleanup_files = []
+
+    def countOutput(self, search_pat, proc=None):
+        '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
+        match_count = 0
+        if proc is None:
+            proc = self.processes[-1]
+        # We might want to let the caller decide what we're searching.
+        out_data = proc.stdout_str + proc.stderr_str
+        search_re = re.compile(search_pat)
+        for line in out_data.splitlines():
+            if search_re.search(line):
+                match_count += 1
+        return match_count
+
+    def grepOutput(self, search_pat, proc=None):
+        return self.countOutput(search_pat, proc) > 0
+
+    def startProcess(self, proc_args, env=None, shell=False):
+        '''Start a process in the background. Returns a subprocess.Popen object. You typically wait for it using waitProcess() or assertWaitProcess().'''
+        proc = LoggingPopen(proc_args, env=env, shell=shell, log_fd=self.log_fd)
+        self.processes.append(proc)
+        return proc
+
+    def waitProcess(self, process):
+        '''Wait for a process to finish.'''
+        process.wait_and_log()
+        # XXX The shell version ran processes using a script called run_and_catch_crashes
+        # which looked for core dumps and printed stack traces if found. We might want
+        # to do something similar here. This may not be easy on modern Ubuntu systems,
+        # which default to using Apport: https://wiki.ubuntu.com/Apport
+
+    def assertWaitProcess(self, process, expected_return=0):
+        '''Wait for a process to finish and check its exit code.'''
+        process.wait_and_log()
+        self.assertEqual(process.returncode, expected_return)
+
+    def runProcess(self, args, env=None, shell=False):
+        '''Start a process and wait for it to finish.'''
+        process = self.startProcess(args, env=env, shell=shell)
+        process.wait_and_log()
+        return process
+
+    def assertRun(self, args, env=None, shell=False, expected_return=0):
+        '''Start a process and wait for it to finish. Check its return code.'''
+        process = self.runProcess(args, env=env, shell=shell)
+        self.assertEqual(process.returncode, expected_return)
+        return process
index 7f24143567e0822ed2c917fdf509a4168e58b4d8..f81c2bec2c85a58312c11a1bc72c5c3d28b8d9f9 100755 (executable)
@@ -183,22 +183,6 @@ decryption_step_udt_dtls() {
        test_step_ok
 }
 
-# IPsec ESP
-# https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=12671
-decryption_step_ipsec_esp() {
-       $TESTS_DIR/run_and_catch_crashes env $TS_DC_ENV $TSHARK $TS_DC_ARGS \
-               -o "esp.enable_encryption_decode: TRUE" \
-               -Tfields -e data.data \
-               -r "$CAPTURE_DIR/esp-bug-12671.pcapng.gz" -Y data \
-               | grep "08:09:0a:0b:0c:0d:0e:0f:10:11:12:13:14:15:16:17" > /dev/null 2>&1
-       RETURNVALUE=$?
-       if [ ! $RETURNVALUE -eq $EXIT_OK ]; then
-               test_step_failed "Failed to decrypt DTLS"
-               return
-       fi
-       test_step_ok
-}
-
 # SSL, using the server's private key
 # https://wiki.wireshark.org/SampleCaptures?action=AttachFile&do=view&target=snakeoil2_070531.tgz
 decryption_step_ssl() {
@@ -419,6 +403,22 @@ decryption_step_dvb_ci() {
        test_step_ok
 }
 
+# IPsec ESP
+# https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=12671
+decryption_step_ipsec_esp() {
+       $TESTS_DIR/run_and_catch_crashes env $TS_DC_ENV $TSHARK $TS_DC_ARGS \
+               -o "esp.enable_encryption_decode: TRUE" \
+               -Tfields -e data.data \
+               -r "$CAPTURE_DIR/esp-bug-12671.pcapng.gz" -Y data \
+               | grep "08:09:0a:0b:0c:0d:0e:0f:10:11:12:13:14:15:16:17" > /dev/null 2>&1
+       RETURNVALUE=$?
+       if [ ! $RETURNVALUE -eq $EXIT_OK ]; then
+               test_step_failed "Failed to decrypt DTLS"
+               return
+       fi
+       test_step_ok
+}
+
 # IKEv1 (ISAKMP) with certificates
 # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=7951
 decryption_step_ikev1_certs() {
diff --git a/test/suite_capture.py b/test/suite_capture.py
new file mode 100644 (file)
index 0000000..a6f7e17
--- /dev/null
@@ -0,0 +1,321 @@
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Capture tests'''
+
+import config
+import os
+import re
+import subprocess
+import subprocesstest
+import sys
+import time
+import unittest
+
+capture_duration = 5
+
+testout_pcap = 'testout.pcap'
+snapshot_len = 96
+capture_env = os.environ.copy()
+capture_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True'
+
+def capture_command(cmd, *args, **kwargs):
+    shell = kwargs.pop('shell', False)
+    if shell:
+        cap_cmd = ['"' + cmd + '"']
+    else:
+        cap_cmd = [cmd]
+    if cmd == config.cmd_wireshark:
+        cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
+    cap_cmd += args
+    if shell:
+        return ' '.join(cap_cmd)
+    else:
+        return cap_cmd
+
+def slow_dhcp_command():
+    # XXX Do this in Python in a thread?
+    sd_cmd = ''
+    if sys.executable:
+        sd_cmd = sys.executable + ' '
+    sd_cmd += os.path.join(config.this_dir, 'util_slow_dhcp_pcap.py')
+    return sd_cmd
+
+def start_pinging(self):
+    ping_procs = []
+    if sys.platform.startswith('win32'):
+        # Fake '-i' with a subsecond interval.
+        for st in (0.1, 0.1, 0):
+            ping_procs.append(self.startProcess(config.args_ping))
+            time.sleep(st)
+    else:
+        ping_procs.append(self.startProcess(config.args_ping))
+    return ping_procs
+
+def stop_pinging(ping_procs):
+    for proc in ping_procs:
+        proc.kill()
+
+def check_testout_num_packets(self, num_packets, cap_file=None):
+    got_num_packets = False
+    if not cap_file:
+        cap_file = self.filename_from_id(testout_pcap)
+    self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
+    capinfos_testout = str(subprocess.check_output((config.cmd_capinfos, cap_file)))
+    self.log_fd_write_bytes(capinfos_testout)
+    count_pat = 'Number of packets:\s+{}'.format(num_packets)
+    if re.search(count_pat, capinfos_testout):
+        got_num_packets = True
+    self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
+
+def check_capture_10_packets(self, cmd=None, to_stdout=False):
+    if not config.canCapture():
+        self.skipTest('Test requires capture privileges and an interface.')
+    if cmd == config.cmd_wireshark and not config.canDisplay():
+        self.skipTest('Test requires a display.')
+    if not config.args_ping:
+        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+    self.assertIsNotNone(cmd)
+    testout_file = self.filename_from_id(testout_pcap)
+    ping_procs = start_pinging(self)
+    if to_stdout:
+        capture_proc = self.runProcess(capture_command(cmd,
+            '-i', '"{}"'.format(config.capture_interface),
+            '-p',
+            '-w', '-',
+            '-c', '10',
+            '-a', 'duration:{}'.format(capture_duration),
+            '-f', '"icmp || icmp6"',
+            '>', testout_file,
+            shell=True
+        ),
+        env=capture_env,
+        shell=True
+        )
+    else:
+        capture_proc = self.runProcess(capture_command(cmd,
+            '-i', config.capture_interface,
+            '-p',
+            '-w', testout_file,
+            '-c', '10',
+            '-a', 'duration:{}'.format(capture_duration),
+            '-f', 'icmp || icmp6',
+        ),
+        env=capture_env
+        )
+    capture_returncode = capture_proc.returncode
+    stop_pinging(ping_procs)
+    self.cleanup_files.append(testout_file)
+    if capture_returncode != 0:
+        self.log_fd.write('{} -D output:\n'.format(cmd))
+        self.runProcess((cmd, '-D'))
+    self.assertEqual(capture_returncode, 0)
+    if (capture_returncode == 0):
+        check_testout_num_packets(self, 10)
+
+def check_capture_fifo(self, cmd=None):
+    if not config.canMkfifo():
+        self.skipTest('Test requires OS fifo support.')
+    if cmd == config.cmd_wireshark and not config.canDisplay():
+        self.skipTest('Test requires a display.')
+    self.assertIsNotNone(cmd)
+    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+    testout_file = self.filename_from_id(testout_pcap)
+    fifo_file = self.filename_from_id('testout.fifo')
+    self.cleanup_files.append(fifo_file)
+    try:
+        # If a previous test left its fifo laying around, e.g. from a failure, remove it.
+        os.unlink(fifo_file)
+    except:
+        pass
+    os.mkfifo(fifo_file)
+    slow_dhcp_cmd = slow_dhcp_command()
+    fifo_proc = self.startProcess(
+        ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
+        shell=True)
+    capture_proc = self.runProcess(capture_command(cmd,
+        '-i', fifo_file,
+        '-p',
+        '-w', testout_file,
+        '-a', 'duration:{}'.format(capture_duration),
+    ),
+    env=capture_env
+    )
+    self.cleanup_files.append(testout_file)
+    fifo_proc.kill()
+    self.assertTrue(os.path.isfile(testout_file))
+    capture_returncode = capture_proc.returncode
+    self.assertEqual(capture_returncode, 0)
+    if (capture_returncode == 0):
+        check_testout_num_packets(self, 8)
+
+def check_capture_stdin(self, cmd=None):
+    if cmd == config.cmd_wireshark and not config.canDisplay():
+        self.skipTest('Test requires a display.')
+    self.assertIsNotNone(cmd)
+    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+    testout_file = self.filename_from_id(testout_pcap)
+    slow_dhcp_cmd = slow_dhcp_command()
+    capture_cmd = capture_command(cmd,
+        '-i', '-',
+        '-w', testout_file,
+        '-a', 'duration:{}'.format(capture_duration),
+        shell=True
+    )
+    if cmd == config.cmd_wireshark:
+        capture_cmd += ' -o console.log.level:127'
+    pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, env=capture_env, shell=True)
+    self.cleanup_files.append(testout_file)
+    pipe_returncode = pipe_proc.returncode
+    self.assertEqual(pipe_returncode, 0)
+    if cmd == config.cmd_wireshark:
+        self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
+        self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
+        self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
+    self.assertTrue(os.path.isfile(testout_file))
+    if (pipe_returncode == 0):
+        check_testout_num_packets(self, 8)
+
+def check_capture_2multi_10packets(self, cmd=None):
+    # This was present in the Bash version but was incorrect and not part of any suite.
+    # It's apparently intended to test file rotation.
+    self.skipTest('Not yet implemented')
+
+def check_capture_read_filter(self, cmd=None):
+    if not config.canCapture():
+        self.skipTest('Test requires capture privileges and an interface.')
+    if cmd == config.cmd_wireshark and not config.canDisplay():
+        self.skipTest('Test requires a display.')
+    if not config.args_ping:
+        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+    self.assertIsNotNone(cmd)
+    ping_procs = start_pinging(self)
+    testout_file = self.filename_from_id(testout_pcap)
+    capture_proc = self.runProcess(capture_command(cmd,
+        '-i', config.capture_interface,
+        '-p',
+        '-w', testout_file,
+        '-2',
+        '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
+        '-c', '10',
+        '-a', 'duration:{}'.format(capture_duration),
+        '-f', 'icmp || icmp6',
+    ),
+    env=capture_env
+    )
+    capture_returncode = capture_proc.returncode
+    stop_pinging(ping_procs)
+    self.cleanup_files.append(testout_file)
+    self.assertEqual(capture_returncode, 0)
+
+    if (capture_returncode == 0):
+        check_testout_num_packets(self, 0)
+
+def check_capture_snapshot_len(self, cmd=None):
+    if not config.canCapture():
+        self.skipTest('Test requires capture privileges and an interface.')
+    if cmd == config.cmd_wireshark and not config.canDisplay():
+        self.skipTest('Test requires a display.')
+    if not config.args_ping:
+        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+    self.assertIsNotNone(cmd)
+    ping_procs = start_pinging(self)
+    testout_file = self.filename_from_id(testout_pcap)
+    capture_proc = self.runProcess(capture_command(cmd,
+        '-i', config.capture_interface,
+        '-p',
+        '-w', testout_file,
+        '-s', str(snapshot_len),
+        '-a', 'duration:{}'.format(capture_duration),
+        '-f', 'icmp || icmp6',
+    ),
+    env=capture_env
+    )
+    capture_returncode = capture_proc.returncode
+    stop_pinging(ping_procs)
+    self.cleanup_files.append(testout_file)
+    self.assertEqual(capture_returncode, 0)
+    self.assertTrue(os.path.isfile(testout_file))
+
+    # Use tshark to filter out all packets larger than 68 bytes.
+    testout2_file = self.filename_from_id('testout2.pcap')
+
+    filter_proc = self.runProcess((config.cmd_tshark,
+        '-r', testout_file,
+        '-w', testout2_file,
+        '-Y', 'frame.cap_len>{}'.format(snapshot_len),
+    ))
+    filter_returncode = filter_proc.returncode
+    self.cleanup_files.append(testout2_file)
+    self.assertEqual(capture_returncode, 0)
+    if (capture_returncode == 0):
+        check_testout_num_packets(self, 0, cap_file=testout2_file)
+
+class case_wireshark_capture(subprocesstest.SubprocessTestCase):
+    def test_wireshark_capture_10_packets_to_file(self):
+        '''Capture 10 packets from the network to a file using Wireshark'''
+        check_capture_10_packets(self, cmd=config.cmd_wireshark)
+
+    # Wireshark doesn't currently support writing to stdout while capturing.
+    # def test_wireshark_capture_10_packets_to_stdout(self):
+    #     '''Capture 10 packets from the network to stdout using Wireshark'''
+    #     check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True)
+
+    def test_wireshark_capture_from_fifo(self):
+        '''Capture from a fifo using Wireshark'''
+        check_capture_fifo(self, cmd=config.cmd_wireshark)
+
+    def test_wireshark_capture_from_stdin(self):
+        '''Capture from stdin using Wireshark'''
+        check_capture_stdin(self, cmd=config.cmd_wireshark)
+
+    def test_wireshark_capture_snapshot_len(self):
+        '''Capture truncated packets using Wireshark'''
+        check_capture_snapshot_len(self, cmd=config.cmd_wireshark)
+
+class case_tshark_capture(subprocesstest.SubprocessTestCase):
+    def test_tshark_capture_10_packets_to_file(self):
+        '''Capture 10 packets from the network to a file using TShark'''
+        check_capture_10_packets(self, cmd=config.cmd_tshark)
+
+    def test_tshark_capture_10_packets_to_stdout(self):
+        '''Capture 10 packets from the network to stdout using TShark'''
+        check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True)
+
+    def test_tshark_capture_from_fifo(self):
+        '''Capture from a fifo using TShark'''
+        check_capture_fifo(self, cmd=config.cmd_tshark)
+
+    def test_tshark_capture_from_stdin(self):
+        '''Capture from stdin using TShark'''
+        check_capture_stdin(self, cmd=config.cmd_tshark)
+
+    def test_tshark_capture_snapshot_len(self):
+        '''Capture truncated packets using TShark'''
+        check_capture_snapshot_len(self, cmd=config.cmd_tshark)
+
+class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
+    def test_dumpcap_capture_10_packets_to_file(self):
+        '''Capture 10 packets from the network to a file using Dumpcap'''
+        check_capture_10_packets(self, cmd=config.cmd_dumpcap)
+
+    def test_dumpcap_capture_10_packets_to_stdout(self):
+        '''Capture 10 packets from the network to stdout using Dumpcap'''
+        check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True)
+
+    def test_dumpcap_capture_from_fifo(self):
+        '''Capture from a fifo using Dumpcap'''
+        check_capture_fifo(self, cmd=config.cmd_dumpcap)
+
+    def test_dumpcap_capture_from_stdin(self):
+        '''Capture from stdin using Dumpcap'''
+        check_capture_stdin(self, cmd=config.cmd_dumpcap)
+
+    def test_dumpcap_capture_snapshot_len(self):
+        '''Capture truncated packets using Dumpcap'''
+        check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)
diff --git a/test/suite_clopts.py b/test/suite_clopts.py
new file mode 100644 (file)
index 0000000..3716585
--- /dev/null
@@ -0,0 +1,176 @@
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Command line option tests'''
+
+import config
+import os.path
+import subprocess
+import subprocesstest
+import unittest
+
+#glossaries = ('fields', 'protocols', 'values', 'decodes', 'defaultprefs', 'currentprefs')
+
+glossaries = ('decodes', 'values')
+
+class case_dumpcap_invalid_chars(subprocesstest.SubprocessTestCase):
+    # XXX Should we generate individual test functions instead of looping?
+    def test_dumpcap_invalid_chars(self):
+        '''Invalid dumpcap parameters'''
+        for char_arg in 'CEFGHJKNOQRTUVWXYejloxz':
+            self.assertRun((config.cmd_dumpcap, '-' + char_arg),
+                           expected_return=self.exit_command_line)
+
+
+class case_dumpcap_valid_chars(subprocesstest.SubprocessTestCase):
+    # XXX Should we generate individual test functions instead of looping?
+    def test_dumpcap_valid_chars(self):
+        for char_arg in 'hv':
+            self.assertRun((config.cmd_dumpcap, '-' + char_arg))
+
+
+class case_dumpcap_invalid_interface_chars(subprocesstest.SubprocessTestCase):
+    # XXX Should we generate individual test functions instead of looping?
+    def test_dumpcap_interface_chars(self):
+        '''Valid dumpcap parameters requiring capture permissions'''
+        valid_returns = [self.exit_ok, self.exit_error]
+        for char_arg in 'DL':
+            process = self.runProcess((config.cmd_dumpcap, '-' + char_arg))
+            self.assertIn(process.returncode, valid_returns)
+
+
+class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
+    @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges')
+    def test_dumpcap_invalid_capfilter(self):
+        '''Invalid capture filter'''
+        invalid_filter = '__invalid_protocol'
+        # $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
+        self.runProcess((config.cmd_dumpcap, '-f', invalid_filter, '-w', 'testout.pcap' ))
+        self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
+
+    @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges')
+    def test_dumpcap_invalid_interface_name(self):
+        '''Invalid capture interface name'''
+        invalid_interface = '__invalid_interface'
+        # $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
+        self.runProcess((config.cmd_dumpcap, '-i', invalid_interface, '-w', 'testout.pcap'))
+        self.assertTrue(self.grepOutput('The capture session could not be initiated'))
+
+    @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges')
+    def test_dumpcap_invalid_interface_index(self):
+        '''Invalid capture interface index'''
+        invalid_index = '0'
+        # $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1
+        self.runProcess((config.cmd_dumpcap, '-i', invalid_index, '-w', 'testout.pcap'))
+        self.assertTrue(self.grepOutput('There is no interface with that adapter index'))
+
+
+class case_basic_clopts(subprocesstest.SubprocessTestCase):
+    def test_existing_file(self):
+        # $TSHARK -r "${CAPTURE_DIR}dhcp.pcap" > ./testout.txt 2>&1
+        cap_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+        self.assertRun((config.cmd_tshark, '-r', cap_file))
+
+    def test_nonexistent_file(self):
+        # $TSHARK - r ThisFileDontExist.pcap > ./testout.txt 2 > &1
+        cap_file = os.path.join(config.capture_dir, '__ceci_nest_pas_une.pcap')
+        self.assertRun((config.cmd_tshark, '-r', cap_file),
+                       expected_return=self.exit_error)
+
+
+class case_tshark_invalid_chars(subprocesstest.SubprocessTestCase):
+    # XXX Should we generate individual test functions instead of looping?
+    def test_tshark_invalid_chars(self):
+        '''Invalid tshark parameters'''
+        for char_arg in 'ABCEFHJKMNORTUWXYZabcdefijkmorstuwyz':
+            self.assertRun((config.cmd_tshark, '-' + char_arg),
+                           expected_return=self.exit_command_line)
+
+
+class case_tshark_valid_chars(subprocesstest.SubprocessTestCase):
+    # XXX Should we generate individual test functions instead of looping?
+    def test_tshark_valid_chars(self):
+        for char_arg in 'Ghv':
+            self.assertRun((config.cmd_tshark, '-' + char_arg))
+
+
+class case_tshark_invalid_interface_chars(subprocesstest.SubprocessTestCase):
+    # XXX Should we generate individual test functions instead of looping?
+    def test_tshark_interface_chars(self):
+        '''Valid tshark parameters requiring capture permissions'''
+        valid_returns = [self.exit_ok, self.exit_error]
+        for char_arg in 'DL':
+            process = self.runProcess((config.cmd_tshark, '-' + char_arg))
+            self.assertIn(process.returncode, valid_returns)
+
+
+class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
+    @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges')
+    def test_tshark_invalid_capfilter(self):
+        '''Invalid capture filter'''
+        invalid_filter = '__invalid_protocol'
+        # $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
+        self.runProcess((config.cmd_tshark, '-f', invalid_filter, '-w', 'testout.pcap' ))
+        self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
+
+    @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges')
+    def test_tshark_invalid_interface_name(self):
+        '''Invalid capture interface name'''
+        invalid_interface = '__invalid_interface'
+        # $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
+        self.runProcess((config.cmd_tshark, '-i', invalid_interface, '-w', 'testout.pcap'))
+        self.assertTrue(self.grepOutput('The capture session could not be initiated'))
+
+    @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges')
+    def test_tshark_invalid_interface_index(self):
+        '''Invalid capture interface index'''
+        invalid_index = '0'
+        # $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1
+        self.runProcess((config.cmd_tshark, '-i', invalid_index, '-w', 'testout.pcap'))
+        self.assertTrue(self.grepOutput('There is no interface with that adapter index'))
+
+
+class case_tshark_name_resolution_clopts(subprocesstest.SubprocessTestCase):
+    @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges')
+    def test_tshark_valid_name_resolution(self):
+        # $TSHARK -N mntC -a duration:1 > ./testout.txt 2>&1
+        self.assertRun((config.cmd_tshark, '-N', 'mntC', '-a', 'duration: 1'))
+
+    # XXX Add invalid name resolution.
+
+class case_tshark_dump_glossaries(subprocesstest.SubprocessTestCase):
+    def test_tshark_dump_glossary(self):
+        for glossary in glossaries:
+            try:
+                self.log_fd.truncate()
+            except:
+                pass
+            self.assertRun((config.cmd_tshark, '-G', glossary))
+
+    def test_tshark_glossary_valid_utf8(self):
+        for glossary in glossaries:
+            env = os.environ.copy()
+            env['LANG'] = 'en_US.UTF-8'
+            g_contents = subprocess.check_output((config.cmd_tshark, '-G', glossary), env=env, stderr=subprocess.PIPE)
+            decoded = True
+            try:
+                g_contents.decode('UTF-8')
+            except UnicodeDecodeError:
+                decoded = False
+            self.assertTrue(decoded, '{} is not valid UTF-8'.format(glossary))
+
+    def test_tshark_glossary_plugin_count(self):
+        self.runProcess((config.cmd_tshark, '-G', 'plugins'))
+        self.assertGreaterEqual(self.countOutput('dissector'), 10, 'Fewer than 10 dissector plugins found')
+
+
+# Purposefully fail a test. Used for testing the test framework.
+# class case_fail_on_purpose(subprocesstest.SubprocessTestCase):
+#     def test_fail_on_purpose(self):
+#         self.runProcess(('echo', 'hello, world'))
+#         self.fail('Not implemented')
diff --git a/test/suite_decryption.py b/test/suite_decryption.py
new file mode 100644 (file)
index 0000000..5b0bc77
--- /dev/null
@@ -0,0 +1,496 @@
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Decryption tests'''
+
+import config
+import os.path
+import subprocesstest
+import unittest
+
+uat_files = [
+    '80211_keys',
+    'dtlsdecrypttablefile',
+    'esp_sa',
+    'ssl_keys',
+    'c1222_decryption_table',
+    'ikev1_decryption_table',
+    'ikev2_decryption_table',
+]
+for uat in uat_files:
+    config.setUpConfigFile(uat)
+
+
+class case_decrypt_80211(subprocesstest.SubprocessTestCase):
+    def test_80211_wpa_psk(self):
+        '''IEEE 802.11 WPA PSK'''
+        # https://wiki.wireshark.org/SampleCaptures?action=AttachFile&do=view&target=wpa-Induction.pcap
+        capture_file = os.path.join(config.capture_dir, 'wpa-Induction.pcap.gz')
+        self.runProcess((config.cmd_tshark,
+                '-o', 'wlan.enable_decryption: TRUE',
+                '-Tfields',
+                '-e', 'http.request.uri',
+                '-r', capture_file,
+                '-Y', 'http',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('favicon.ico'))
+
+    def test_80211_wpa_eap(self):
+        '''IEEE 802.11 WPA EAP (EAPOL Rekey)'''
+        # Included in git sources test/captures/wpa-eap-tls.pcap.gz
+        capture_file = os.path.join(config.capture_dir, 'wpa-eap-tls.pcap.gz')
+        self.runProcess((config.cmd_tshark,
+                '-o', 'wlan.enable_decryption: TRUE',
+                '-r', capture_file,
+                '-Y', 'wlan.analysis.tk==7d9987daf5876249b6c773bf454a0da7',
+                ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('Group Message'))
+
+    def test_80211_wpa_eapol_incomplete_rekeys(self):
+        '''WPA decode with message1+2 only and secure bit set on message 2'''
+        # Included in git sources test/captures/wpa-test-decode.pcap.gz
+        capture_file = os.path.join(config.capture_dir, 'wpa-test-decode.pcap.gz')
+        self.runProcess((config.cmd_tshark,
+                '-o', 'wlan.enable_decryption: TRUE',
+                '-r', capture_file,
+                '-Y', 'icmp.resp_to == 4263',
+                ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('Echo'))
+
+    def test_80211_wpa_psk_mfp(self):
+        '''WPA decode management frames with MFP enabled (802.11w)'''
+        # Included in git sources test/captures/wpa-test-decode-mgmt.pcap.gz
+        capture_file = os.path.join(config.capture_dir, 'wpa-test-decode-mgmt.pcap.gz')
+        self.runProcess((config.cmd_tshark,
+                '-o', 'wlan.enable_decryption: TRUE',
+                '-r', capture_file,
+                '-Y', 'wlan.fixed.reason_code == 2 || wlan.fixed.category_code == 3',
+                ),
+            env=config.test_env)
+        self.assertEqual(self.countOutput('802.11.*SN=.*FN=.*Flags='), 3)
+
+
+    def test_80211_wpa_tdls(self):
+        '''WPA decode traffic in a TDLS (Tunneled Direct-Link Setup) session (802.11z)'''
+        # Included in git sources test/captures/wpa-test-decode-tdls.pcap.gz
+        capture_file = os.path.join(config.capture_dir, 'wpa-test-decode-tdls.pcap.gz')
+        self.runProcess((config.cmd_tshark,
+                '-o', 'wlan.enable_decryption: TRUE',
+                '-r', capture_file,
+                '-Y', 'icmp',
+                ),
+            env=config.test_env)
+        self.assertEqual(self.countOutput('ICMP.*Echo .ping'), 2)
+
+class case_decrypt_dtls(subprocesstest.SubprocessTestCase):
+    def test_dtls(self):
+        '''DTLS'''
+        # https://wiki.wireshark.org/SampleCaptures?action=AttachFile&do=view&target=snakeoil.tgz
+        capture_file = os.path.join(config.capture_dir, 'snakeoil-dtls.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'data.data',
+                '-Y', 'data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('69:74:20:77:6f:72:6b:20:21:0a'))
+
+    def test_dtls_psk_aes128ccm8(self):
+        '''DTLS 1.2 with PSK, AES-128-CCM-8'''
+        capture_file = os.path.join(config.capture_dir, 'dtls12-aes128ccm8.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'dtls.psk:ca19e028a8a372ad2d325f950fcaceed',
+                '-x'
+            ),
+            env=config.test_env)
+        dt_count = self.countOutput('Decrypted DTLS')
+        wfm_count = self.countOutput('Works for me!.')
+        self.assertTrue(dt_count == 7 and wfm_count == 2)
+
+    def test_dtls_udt(self):
+        '''UDT over DTLS 1.2 with RSA key'''
+        capture_file = os.path.join(config.capture_dir, 'udt-dtls.pcapng.gz')
+        key_file = os.path.join(config.key_dir, 'udt-dtls.key')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'dtls.keys_list:0.0.0.0,0,data,{}'.format(key_file),
+                '-Y', 'dtls && udt.type==ack',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('UDT'))
+
+class case_decrypt_tls(subprocesstest.SubprocessTestCase):
+    def test_ssl(self):
+        '''SSL using the server's private key'''
+        # https://wiki.wireshark.org/SampleCaptures?action=AttachFile&do=view&target=snakeoil2_070531.tgz
+        capture_file = os.path.join(config.capture_dir, 'rsasnakeoil2.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'http.request.uri',
+                '-Y', 'http',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('favicon.ico'))
+
+    def test_ssl_rsa_pq(self):
+        '''SSL using the server's private key with p < q
+        (test whether libgcrypt is correctly called)'''
+        capture_file = os.path.join(config.capture_dir, 'rsa-p-lt-q.pcap')
+        key_file = os.path.join(config.key_dir, 'rsa-p-lt-q.key')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'ssl.keys_list:0.0.0.0,443,http,{}'.format(key_file),
+                '-Tfields',
+                '-e', 'http.request.uri',
+                '-Y', 'http',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('/'))
+
+    def test_ssl_with_password(self):
+        '''SSL using the server's private key with password'''
+        capture_file = os.path.join(config.capture_dir, 'dmgr.pcapng')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'http.request.uri',
+                '-Y', 'http',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('unsecureLogon.jsp'))
+
+    def test_ssl_master_secret(self):
+        '''SSL using the master secret'''
+        capture_file = os.path.join(config.capture_dir, 'dhe1.pcapng.gz')
+        key_file = os.path.join(config.key_dir, 'dhe1_keylog.dat')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'ssl.keylog_file: {}'.format(key_file),
+                '-o', 'ssl.desegment_ssl_application_data: FALSE',
+                '-o', 'http.ssl.port: 443',
+                '-Tfields',
+                '-e', 'http.request.uri',
+                '-Y', 'http',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('test'))
+
+    def test_tls12_renegotiation(self):
+        '''TLS 1.2 with renegotiation'''
+        capture_file = os.path.join(config.capture_dir, 'tls-renegotiation.pcap')
+        key_file = os.path.join(config.key_dir, 'rsasnakeoil2.key')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'ssl.keys_list:0.0.0.0,4433,http,{}'.format(key_file),
+                '-Tfields',
+                '-e', 'http.content_length',
+                '-Y', 'http',
+            ),
+            env=config.test_env)
+        count_0 = self.countOutput('^0$')
+        count_2151 = self.countOutput('^2151$')
+        self.assertTrue(count_0 == 1 and count_2151 == 1)
+
+    def test_tls12_psk_aes128ccm(self):
+        '''TLS 1.2 with PSK, AES-128-CCM'''
+        capture_file = os.path.join(config.capture_dir, 'tls12-aes128ccm.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'ssl.psk:ca19e028a8a372ad2d325f950fcaceed',
+                '-q',
+                '-z', 'follow,ssl,ascii,0',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('http://www.gnu.org/software/gnutls'))
+
+    def test_tls12_psk_aes256gcm(self):
+        '''TLS 1.2 with PSK, AES-256-GCM'''
+        capture_file = os.path.join(config.capture_dir, 'tls12-aes256gcm.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'ssl.psk:ca19e028a8a372ad2d325f950fcaceed',
+                '-q',
+                '-z', 'follow,ssl,ascii,0',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('http://www.gnu.org/software/gnutls'))
+
+    def test_tls12_chacha20poly1305(self):
+        '''TLS 1.2 with ChaCha20-Poly1305'''
+        if not config.have_libgcrypt17:
+            self.skipTest('Requires GCrypt 1.7 or later.')
+        capture_file = os.path.join(config.capture_dir, 'tls12-chacha20poly1305.pcap')
+        key_file = os.path.join(config.key_dir, 'tls12-chacha20poly1305.keys')
+        ciphers=[
+            'ECDHE-ECDSA-CHACHA20-POLY1305',
+            'ECDHE-RSA-CHACHA20-POLY1305',
+            'DHE-RSA-CHACHA20-POLY1305',
+            'RSA-PSK-CHACHA20-POLY1305',
+            'DHE-PSK-CHACHA20-POLY1305',
+            'ECDHE-PSK-CHACHA20-POLY1305',
+            'PSK-CHACHA20-POLY1305',
+        ]
+        stream = 0
+        for cipher in ciphers:
+            self.runProcess((config.cmd_tshark,
+                    '-r', capture_file,
+                    '-o', 'ssl.keylog_file: {}'.format(key_file),
+                    '-q',
+                    '-z', 'follow,ssl,ascii,{}'.format(stream),
+                ),
+                env=config.test_env)
+            stream += 1
+            self.assertTrue(self.grepOutput('Cipher is {}'.format(cipher)))
+
+    def test_tls13_chacha20poly1305(self):
+        '''TLS 1.3 with ChaCha20-Poly1305'''
+        if not config.have_libgcrypt17:
+            self.skipTest('Requires GCrypt 1.7 or later.')
+        capture_file = os.path.join(config.capture_dir, 'tls13-20-chacha20poly1305.pcap')
+        key_file = os.path.join(config.key_dir, 'tls13-20-chacha20poly1305.keys')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'ssl.keylog_file: {}'.format(key_file),
+                '-q',
+                '-z', 'follow,ssl,ascii,0',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('TLS13-CHACHA20-POLY1305-SHA256'))
+
+class case_decrypt_zigbee(subprocesstest.SubprocessTestCase):
+    def test_zigbee(self):
+        '''ZigBee'''
+        # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=7022
+        capture_file = os.path.join(config.capture_dir, 'sample_control4_2012-03-24.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'data.data',
+                '-Y', 'zbee_aps',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('30:67:63:63:38:65:20:63:34:2e:64:6d:2e:74:76:20'))
+
+class case_decrypt_ansi_c1222(subprocesstest.SubprocessTestCase):
+    def test_ansi_c1222(self):
+        '''ANSI C12.22'''
+        # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=9196
+        capture_file = os.path.join(config.capture_dir, 'c1222_std_example8.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'c1222.decrypt: TRUE',
+                '-o', 'c1222.baseoid: 2.16.124.113620.1.22.0',
+                '-Tfields',
+                '-e', 'c1222.data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('00:10:4d:41:4e:55:46:41:43:54:55:52:45:52:20:53:4e:20:92'))
+
+class case_decrypt_dvb_ci(subprocesstest.SubprocessTestCase):
+    def test_dvb_ci(self):
+        '''DVB-CI'''
+        # simplified version of the sample capture in
+        # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=6700
+        capture_file = os.path.join(config.capture_dir, 'dvb-ci_UV1_0000.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'dvb-ci.sek: 00000000000000000000000000000000',
+                '-o', 'dvb-ci.siv: 00000000000000000000000000000000',
+                '-Tfields',
+                '-e', 'dvb-ci.cc.sac.padding',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('80:00:00:00:00:00:00:00:00:00:00:00'))
+
+class case_decrypt_ipsec(subprocesstest.SubprocessTestCase):
+    def test_ipsec_esp(self):
+        '''IPsec ESP'''
+        # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=12671
+        capture_file = os.path.join(config.capture_dir, 'esp-bug-12671.pcapng.gz')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'esp.enable_encryption_decode: TRUE',
+                '-Tfields',
+                '-e', 'data.data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('08:09:0a:0b:0c:0d:0e:0f:10:11:12:13:14:15:16:17'))
+
+class case_decrypt_ike_isakmp(subprocesstest.SubprocessTestCase):
+    def test_ikev1_certs(self):
+        '''IKEv1 (ISAKMP) with certificates'''
+        # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=7951
+        capture_file = os.path.join(config.capture_dir, 'ikev1-certs.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'x509sat.printableString',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('OpenSwan'))
+
+    def test_ikev1_simultaneous(self):
+        '''IKEv1 (ISAKMP) simultaneous exchanges'''
+        # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=12610
+        capture_file = os.path.join(config.capture_dir, 'ikev1-bug-12610.pcapng.gz')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'isakmp.hash',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('b5:25:21:f7:74:96:74:02:c9:f6:ce:e9:5f:d1:7e:5b'))
+
+    def test_ikev1_unencrypted(self):
+        '''IKEv1 (ISAKMP) unencrypted phase 1'''
+        # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=12620
+        capture_file = os.path.join(config.capture_dir, 'ikev1-bug-12620.pcapng.gz')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'isakmp.hash',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('40:04:3b:64:0f:43:73:25:0d:5a:c3:a1:fb:63:15:3c'))
+
+    def test_ikev2_3des_sha160(self):
+        '''IKEv2 decryption test (3DES-CBC/SHA1_160)'''
+        capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-3des-sha1_160.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'isakmp.auth.data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('02:f7:a0:d5:f1:fd:c8:ea:81:03:98:18:c6:5b:b9:bd:09:af:9b:89:17:31:9b:88:7f:f9:ba:30:46:c3:44:c7'))
+
+    def test_ikev2_aes128_ccm12(self):
+        '''IKEv2 decryption test (AES-128-CCM-12) - with CBC-MAC verification'''
+        capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes128ccm12.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'isakmp.auth.data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('c2:10:43:94:29:9e:1f:fe:79:08:ea:72:0a:d5:d1:37:17:a0:d4:54:e4:fa:0a:21:28:ea:68:94:11:f4:79:c4'))
+
+    def test_ikev2_aes128_ccm12_2(self):
+        '''IKEv2 decryption test (AES-128-CCM-12 using CTR mode, without checksum)'''
+        capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes128ccm12-2.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'isakmp.auth.data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('aa:a2:81:c8:7b:4a:19:04:6c:57:27:1d:55:74:88:ca:41:3b:57:22:8c:b9:51:f5:fa:96:40:99:2a:02:85:b9'))
+
+    def test_ikev2_aes192ctr_sha512(self):
+        '''IKEv2 decryption test (AES-192-CTR/SHA2-512)'''
+        capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes192ctr.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'isakmp.auth.data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('3e:c2:3d:cf:93:48:48:56:38:40:7c:75:45:47:ae:b3:08:52:90:08:2c:49:f5:83:fd:ba:e5:92:63:a2:0b:4a'))
+
+    def test_ikev2_aes256cbc_sha256(self):
+        '''IKEv2 decryption test (AES-256-CBC/SHA2-256)'''
+        capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes256cbc.pcapng')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'isakmp.auth.data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('e1:a8:d5:50:06:42:01:a7:ec:02:4a:85:75:8d:06:73:c6:1c:5c:51:0a:c1:3b:cd:22:5d:63:27:f5:0d:a3:d3'))
+
+    def test_ikev2_aes256ccm16(self):
+        '''IKEv2 decryption test (AES-256-CCM-16)'''
+        capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes256ccm16.pcapng')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'isakmp.auth.data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('fa:2e:74:bd:c0:1e:30:fb:0b:3d:dc:97:23:c9:44:90:95:96:9d:a5:1f:69:e5:60:20:9d:2c:2b:79:40:21:0a'))
+
+    def test_ikev2_aes256gcm16(self):
+        '''IKEv2 decryption test (AES-256-GCM-16)'''
+        capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes256gcm16.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'isakmp.auth.data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('9a:b7:1f:14:ab:55:3c:ad:87:3a:1a:a7:0b:99:df:15:5d:ee:77:cd:cf:36:94:b3:b7:52:7a:cb:b9:71:2d:ed'))
+
+    def test_ikev2_aes256gcm8(self):
+        '''IKEv2 decryption test (AES-256-GCM-8)'''
+        capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes256gcm8.pcap')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'isakmp.auth.data',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('4a:66:d8:22:d0:af:bc:22:ad:9a:92:a2:cf:42:87:c9:20:ad:8a:c3:b0:69:a4:a7:e7:5f:e0:a5:d4:99:f9:14'))
+
+class case_decrypt_http2(subprocesstest.SubprocessTestCase):
+    def test_http2(self):
+        '''HTTP2 (HPACK)'''
+        if not config.have_nghttp2:
+            self.skipTest('Requires nghttp2.')
+        capture_file = os.path.join(config.capture_dir, 'packet-h2-14_headers.pcapng')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-Tfields',
+                '-e', 'http2.header.value',
+                '-d', 'tcp.port==3000,http2',
+            ),
+            env=config.test_env)
+        test_passed = self.grepOutput('nghttp2')
+        if not test_passed:
+            self.log_fd.write(u'\n\n-- Verbose output --\n\n')
+            self.runProcess((config.cmd_tshark,
+                    '-r', capture_file,
+                    '-V',
+                    '-d', 'tcp.port==3000,http2',
+                ),
+                env=config.test_env)
+        self.assertTrue(test_passed)
+
+class case_decrypt_kerberos(subprocesstest.SubprocessTestCase):
+    def test_kerberos(self):
+        '''Kerberos'''
+        # Files are from krb-816.zip on the SampleCaptures page.
+        if not config.have_kerberos:
+            self.skipTest('Requires nghttp2.')
+        capture_file = os.path.join(config.capture_dir, 'krb-816.pcap.gz')
+        keytab_file = os.path.join(config.key_dir, 'krb-816.keytab')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'kerberos.decrypt: TRUE',
+                '-o', 'kerberos.file: {}'.format(keytab_file),
+                '-Tfields',
+                '-e', 'kerberos.keyvalue',
+            ),
+            env=config.test_env)
+        # keyvalue: ccda7d48219f73c3b28311c4ba7242b3
+        self.assertTrue(self.grepOutput('cc:da:7d:48:21:9f:73:c3:b2:83:11:c4:ba:72:42:b3'))
diff --git a/test/suite_dissection.py b/test/suite_dissection.py
new file mode 100644 (file)
index 0000000..cfdab92
--- /dev/null
@@ -0,0 +1,30 @@
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Dissection tests'''
+
+import config
+import os.path
+import subprocesstest
+import unittest
+
+class case_dissect_http2(subprocesstest.SubprocessTestCase):
+    def test_http2_data_reassembly(self):
+        '''HTTP2 data reassembly'''
+        if not config.have_nghttp2:
+            self.skipTest('Requires nghttp2.')
+        capture_file = os.path.join(config.capture_dir, 'http2-data-reassembly.pcap')
+        key_file = os.path.join(config.key_dir, 'http2-data-reassembly.keys')
+        self.runProcess((config.cmd_tshark,
+                '-r', capture_file,
+                '-o', 'ssl.keylog_file: {}'.format(key_file),
+                '-d', 'tcp.port==8443,ssl',
+                '-Y', 'http2.data.data matches "PNG" && http2.data.data matches "END"',
+            ),
+            env=config.test_env)
+        self.assertTrue(self.grepOutput('DATA'))
diff --git a/test/test.py b/test/test.py
new file mode 100755 (executable)
index 0000000..f53d90a
--- /dev/null
@@ -0,0 +1,120 @@
+#!/usr/bin/env python
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Main test script'''
+
+# To do:
+# - Avoid printing Python tracebacks when we assert? It looks like we'd need
+#   to override unittest.TextTestResult.addFailure().
+
+import argparse
+import config
+import os.path
+import sys
+import unittest
+
+def find_test_ids(suite, all_ids):
+    if hasattr(suite, '__iter__'):
+        for s in suite:
+            find_test_ids(s, all_ids)
+    else:
+        all_ids.append(suite.id())
+
+def dump_failed_output(suite):
+    if hasattr(suite, '__iter__'):
+        for s in suite:
+            dump_failures = getattr(s, 'dump_failures', None)
+            if dump_failures:
+                dump_failures()
+            else:
+                dump_failed_output(s)
+
+def main():
+    parser = argparse.ArgumentParser(description='Wireshark unit tests')
+    cap_group = parser.add_mutually_exclusive_group()
+    cap_group.add_argument('-e', '--enable-capture', action='store_true', help='Enable capture tests')
+    cap_group.add_argument('-E', '--disable-capture', action='store_true', help='Disable capture tests')
+    cap_group.add_argument('-i', '--capture-interface', nargs=1, default=None, help='Capture interface index or name')
+    parser.add_argument('-p', '--program-path', nargs=1, default=os.path.curdir, help='Path to Wireshark executables.')
+    list_group = parser.add_mutually_exclusive_group()
+    list_group.add_argument('-l', '--list', action='store_true', help='List tests. One of "all" or a full or partial test name.')
+    list_group.add_argument('--list-suites', action='store_true', help='List all suites.')
+    list_group.add_argument('--list-cases', action='store_true', help='List all suites and cases.')
+    parser.add_argument('-v', '--verbose', action='store_const', const=2, default=1, help='Verbose tests.')
+    parser.add_argument('tests_to_run', nargs='*', metavar='test', default=['all'], help='Tests to run. One of "all" or a full or partial test name. Default is "all".')
+    args = parser.parse_args()
+
+    if args.enable_capture:
+        config.setCanCapture(True)
+    elif args.disable_capture:
+        config.setCanCapture(False)
+
+    if args.capture_interface:
+        config.setCaptureInterface(args.capture_interface[0])
+
+    all_tests = unittest.defaultTestLoader.discover(os.path.dirname(__file__), pattern='suite_*.py')
+
+    all_ids = []
+    find_test_ids(all_tests, all_ids)
+
+    run_ids = []
+    for tid in all_ids:
+        for ttr in args.tests_to_run:
+            ttrl = ttr.lower()
+            if ttrl == 'all':
+                run_ids = all_ids
+                break
+            if ttrl in tid.lower():
+                run_ids.append(tid)
+
+    if not run_ids:
+        print('No tests found. You asked for:\n  ' + '\n  '.join(args.tests_to_run))
+        parser.print_usage()
+        sys.exit(1)
+
+    if args.list:
+        print('\n'.join(run_ids))
+        sys.exit(0)
+
+    if args.list_suites:
+        suites = set()
+        for rid in run_ids:
+            rparts = rid.split('.')
+            suites |= {rparts[0]}
+        print('\n'.join(list(suites)))
+        sys.exit(0)
+
+    if args.list_cases:
+        cases = set()
+        for rid in run_ids:
+            rparts = rid.split('.')
+            cases |= {'.'.join(rparts[:2])}
+        print('\n'.join(list(cases)))
+        sys.exit(0)
+
+    program_path = args.program_path[0]
+    if not config.setProgramPath(program_path):
+        print('One or more required executables not found at {}\n'.format(program_path))
+        parser.print_usage()
+        sys.exit(1)
+
+    run_suite = unittest.defaultTestLoader.loadTestsFromNames(run_ids)
+    runner = unittest.TextTestRunner(verbosity=args.verbose)
+    test_result = runner.run(run_suite)
+
+    dump_failed_output(run_suite)
+
+    if test_result.errors:
+        sys.exit(2)
+
+    if test_result.failures:
+        sys.exit(1)
+
+if __name__ == '__main__':
+    main()
diff --git a/test/util_slow_dhcp_pcap.py b/test/util_slow_dhcp_pcap.py
new file mode 100644 (file)
index 0000000..0adff51
--- /dev/null
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Write captures/dhcp.pcap to stdout, pause 1.5 seconds, and write it again.'''
+
+import os
+import os.path
+import time
+import sys
+
+dhcp_pcap = os.path.join(os.path.dirname(__file__), 'captures', 'dhcp.pcap')
+
+dhcp_fd = open(dhcp_pcap, 'rb')
+contents = dhcp_fd.read()
+os.write(1, contents)
+time.sleep(1.5)
+os.write(1, contents[24:])