test: finalize suite_capture conversion to fixtures, drop config.py
authorPeter Wu <peter@lekensteyn.nl>
Fri, 16 Nov 2018 01:28:32 +0000 (02:28 +0100)
committerPeter Wu <peter@lekensteyn.nl>
Fri, 16 Nov 2018 13:55:40 +0000 (13:55 +0000)
Convert the old start_pinging routine to use pytest fixtures, rewriting
it to enable a different generator that uses (for example) UDP.
Remove the config module since it is no longer neded.

Change-Id: Ic4727157faab084b41144e8f16ea44f59c9037d8
Reviewed-on: https://code.wireshark.org/review/30659
Petri-Dish: Peter Wu <peter@lekensteyn.nl>
Tested-by: Petri Dish Buildbot
Reviewed-by: Peter Wu <peter@lekensteyn.nl>
test/config.py [deleted file]
test/fixtures_ws.py
test/subprocesstest.py
test/suite_capture.py

diff --git a/test/config.py b/test/config.py
deleted file mode 100644 (file)
index 87aa127..0000000
+++ /dev/null
@@ -1,257 +0,0 @@
-#
-# -*- coding: utf-8 -*-
-# Wireshark tests
-# By Gerald Combs <gerald@wireshark.org>
-#
-# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
-#
-# SPDX-License-Identifier: GPL-2.0-or-later
-#
-'''Configuration'''
-
-import logging
-import os
-import os.path
-import re
-import shutil
-import subprocess
-import sys
-import tempfile
-
-commands = (
-    'capinfos',
-    'dumpcap',
-    'mergecap',
-    'rawshark',
-    'sharkd',
-    'text2pcap',
-    'tshark',
-    'wireshark',
-)
-
-can_capture = False
-capture_interface = None
-
-# Our executables
-program_path = None
-# Strings
-cmd_capinfos = None
-cmd_dumpcap = None
-cmd_mergecap = None
-cmd_rawshark = None
-cmd_tshark = None
-cmd_text2pcap = None
-cmd_wireshark = None
-# Arrays
-args_ping = None
-
-have_lua = False
-have_nghttp2 = False
-have_kerberos = False
-have_libgcrypt16 = False
-have_libgcrypt17 = False
-
-test_env = None
-program_path = None
-home_path = None
-conf_path = None
-custom_profile_path = None
-custom_profile_name = 'Custom Profile'
-
-this_dir = os.path.dirname(__file__)
-baseline_dir = os.path.join(this_dir, 'baseline')
-capture_dir = os.path.join(this_dir, 'captures')
-config_dir = os.path.join(this_dir, 'config')
-key_dir = os.path.join(this_dir, 'keys')
-lua_dir = os.path.join(this_dir, 'lua')
-tools_dir = os.path.join(this_dir, '..', 'tools')
-
-all_groups = []
-
-def canCapture():
-    # XXX This appears to be evaluated at the wrong time when called
-    # from a unittest.skipXXX decorator.
-    return can_capture and capture_interface is not None
-
-def setCanCapture(new_cc):
-    can_capture = new_cc
-
-def setCaptureInterface(iface):
-    global capture_interface
-    setCanCapture(True)
-    capture_interface = iface
-
-def canMkfifo():
-    return not sys.platform.startswith('win32')
-
-def canDisplay():
-    if sys.platform.startswith('win32') or sys.platform.startswith('darwin'):
-        return True
-    # Qt requires XKEYBOARD and Xrender, which Xvnc doesn't provide.
-    return False
-
-def getTsharkInfo():
-    global have_lua
-    global have_nghttp2
-    global have_kerberos
-    global have_libgcrypt16
-    global have_libgcrypt17
-    if not cmd_tshark:
-        logging.warning("tshark binary is not yet set")
-        return
-    try:
-        tshark_v = subprocess.check_output(
-            (cmd_tshark, '--version'),
-            stderr=subprocess.PIPE,
-            universal_newlines=True,
-            env=baseEnv()
-        ).replace('\n', ' ')
-    except subprocess.CalledProcessError as e:
-        logging.warning("Failed to detect tshark features: %s", e)
-        tshark_v = ''
-    have_lua = bool(re.search('with +Lua', tshark_v))
-    have_nghttp2 = bool(re.search('with +nghttp2', tshark_v))
-    have_kerberos = bool(re.search('(with +MIT +Kerberos|with +Heimdal +Kerberos)', tshark_v))
-    gcry_m = re.search('with +Gcrypt +([0-9]+\.[0-9]+)', tshark_v)
-    have_libgcrypt16 = gcry_m and float(gcry_m.group(1)) >= 1.6
-    have_libgcrypt17 = gcry_m and float(gcry_m.group(1)) >= 1.7
-
-def getDefaultCaptureInterface():
-    '''Choose a default capture interface for our platform. Currently Windows only.'''
-    global capture_interface
-    if capture_interface:
-        return
-    if cmd_dumpcap is None:
-        return
-    if not sys.platform.startswith('win32'):
-        return
-    try:
-        dumpcap_d_data = subprocess.check_output((cmd_dumpcap, '-D'), stderr=subprocess.PIPE)
-        dumpcap_d_stdout = dumpcap_d_data.decode('UTF-8', 'replace')
-        for d_line in dumpcap_d_stdout.splitlines():
-            iface_m = re.search('(\d+)\..*(Ethernet|Network Connection|VMware|Intel)', d_line)
-            if iface_m:
-                capture_interface = iface_m.group(1)
-                break
-    except:
-        pass
-
-def getPingCommand():
-    '''Return an argument list required to ping www.wireshark.org for 60 seconds.'''
-    global args_ping
-    # XXX The shell script tests swept over packet sizes from 1 to 240 every 0.25 seconds.
-    if sys.platform.startswith('win32'):
-        # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping
-        args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org')
-    elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
-        args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org')
-    elif sys.platform.startswith('darwin'):
-        args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org')
-    # XXX Other BSDs, Solaris, etc
-
-def setProgramPath(path):
-    global program_path
-    program_path = path
-    retval = True
-    dotexe = ''
-    if sys.platform.startswith('win32'):
-        dotexe = '.exe'
-    for cmd in commands:
-        cmd_var = 'cmd_' + cmd
-        cmd_path = os.path.normpath(os.path.join(path, cmd + dotexe))
-        if not os.path.exists(cmd_path) or not os.access(cmd_path, os.X_OK):
-            cmd_path = None
-            program_path = None
-            retval = False
-        globals()[cmd_var] = cmd_path
-    getTsharkInfo()
-    getDefaultCaptureInterface()
-    setUpHostFiles()
-    return retval
-
-def baseEnv(home=None):
-    """A modified environment to ensure reproducible tests."""
-    env = os.environ.copy()
-    env['TZ'] = 'UTC'
-    home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME'
-    if home:
-        env[home_env] = home
-    else:
-        # This directory is supposed not to be written and is used by
-        # "readonly" tests that do not read any other preferences.
-        env[home_env] = "/wireshark-tests-unused"
-    return env
-
-def setUpTestEnvironment():
-    global home_path
-    global conf_path
-    global custom_profile_path
-    global test_env
-
-    # Create our directories
-    test_confdir = tempfile.mkdtemp(prefix='wireshark-tests.')
-    home_path = os.path.join(test_confdir, 'home')
-    if sys.platform.startswith('win32'):
-        conf_path = os.path.join(home_path, 'Wireshark')
-    else:
-        conf_path = os.path.join(home_path, '.config', 'wireshark')
-    os.makedirs(conf_path)
-    # Test spaces while we're here.
-    custom_profile_path = os.path.join(conf_path, 'profiles', custom_profile_name)
-    os.makedirs(custom_profile_path)
-
-    # Populate our UAT files
-    uat_files = [
-        '80211_keys',
-        'dtlsdecrypttablefile',
-        'esp_sa',
-        'ssl_keys',
-        'c1222_decryption_table',
-        'ikev1_decryption_table',
-        'ikev2_decryption_table',
-    ]
-    for uat in uat_files:
-        setUpUatFile(conf_path, uat)
-
-    # Set up our environment
-    test_env = baseEnv(home=home_path)
-    test_env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = 'True'
-    test_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True'
-
-def setUpUatFile(conf_path, conf_file):
-    template = os.path.join(os.path.dirname(__file__), 'config', conf_file) + '.tmpl'
-    with open(template, 'r') as tplt_fd:
-        tplt_contents = tplt_fd.read()
-        tplt_fd.close()
-        key_dir_path = os.path.join(key_dir, '')
-        # uat.c replaces backslashes...
-        key_dir_path = key_dir_path.replace('\\', '\\x5c')
-        cf_contents = tplt_contents.replace('TEST_KEYS_DIR', key_dir_path)
-    out_file = os.path.join(conf_path, conf_file)
-    with open(out_file, 'w') as cf_fd:
-        cf_fd.write(cf_contents)
-        cf_fd.close()
-
-def setUpHostFiles():
-    global program_path
-    global conf_path
-    global custom_profile_path
-    if program_path is None:
-        return
-    if conf_path is None or custom_profile_path is None:
-        setUpTestEnvironment()
-    bundle_path = os.path.join(program_path, 'Wireshark.app', 'Contents', 'MacOS')
-    if os.path.isdir(bundle_path):
-        global_path = bundle_path
-    else:
-        global_path = program_path
-    hosts_path_pfx = os.path.join(this_dir, 'hosts.')
-    shutil.copyfile(hosts_path_pfx + 'global', os.path.join(global_path, 'hosts'))
-    shutil.copyfile(hosts_path_pfx + 'personal', os.path.join(conf_path, 'hosts'))
-    shutil.copyfile(hosts_path_pfx + 'custom', os.path.join(custom_profile_path, 'hosts'))
-
-if sys.platform.startswith('win32') or sys.platform.startswith('darwin'):
-    can_capture = True
-
-# Initialize ourself.
-getPingCommand()
index 55e3e2151c042a4623716364b17af4427bd6a080..053d89e0ceb68da7661b0f12ff3e39587ac52a68 100644 (file)
@@ -17,7 +17,6 @@ import tempfile
 import types
 
 import fixtures
-import config
 import subprocesstest
 
 
@@ -126,15 +125,14 @@ def wireshark_command(cmd_wireshark):
 
 
 @fixtures.fixture(scope='session')
-def features(cmd_tshark):
+def features(cmd_tshark, make_env):
     '''Returns an object describing available features in tshark.'''
     try:
-        # XXX stop using config
         tshark_v = subprocess.check_output(
             (cmd_tshark, '--version'),
             stderr=subprocess.PIPE,
             universal_newlines=True,
-            env=config.baseEnv()
+            env=make_env()
         )
         tshark_v = re.sub(r'\s+', ' ', tshark_v)
     except subprocess.CalledProcessError as ex:
@@ -190,14 +188,28 @@ def conf_path(home_path):
     return conf_path
 
 
+@fixtures.fixture(scope='session')
+def make_env():
+    """A factory for a modified environment to ensure reproducible tests."""
+    def make_env_real(home=None):
+        env = os.environ.copy()
+        env['TZ'] = 'UTC'
+        home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME'
+        if home:
+            env[home_env] = home
+        else:
+            # This directory is supposed not to be written and is used by
+            # "readonly" tests that do not read any other preferences.
+            env[home_env] = "/wireshark-tests-unused"
+        return env
+    return make_env_real
+
+
 @fixtures.fixture
-def base_env(home_path, request):
+def base_env(home_path, make_env, request):
     """A modified environment to ensure reproducible tests. Tests can modify
     this environment as they see fit."""
-    env = os.environ.copy()
-    env['TZ'] = 'UTC'
-    home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME'
-    env[home_env] = home_path
+    env = make_env(home=home_path)
 
     # Remove this if test instances no longer inherit from SubprocessTestCase?
     if isinstance(request.instance, subprocesstest.SubprocessTestCase):
@@ -207,7 +219,7 @@ def base_env(home_path, request):
 
 
 @fixtures.fixture
-def test_env(base_env, conf_path, request):
+def test_env(base_env, conf_path, request, dirs):
     '''A process environment with a populated configuration directory.'''
     # Populate our UAT files
     uat_files = [
@@ -219,9 +231,16 @@ def test_env(base_env, conf_path, request):
         'ikev1_decryption_table',
         'ikev2_decryption_table',
     ]
+    # uat.c replaces backslashes...
+    key_dir_path = os.path.join(dirs.key_dir, '').replace('\\', '\\x5c')
     for uat in uat_files:
-        # XXX stop using config
-        config.setUpUatFile(conf_path, uat)
+        template_file = os.path.join(dirs.config_dir, uat + '.tmpl')
+        out_file = os.path.join(conf_path, uat)
+        with open(template_file, 'r') as f:
+            template_contents = f.read()
+        cf_contents = template_contents.replace('TEST_KEYS_DIR', key_dir_path)
+        with open(out_file, 'w') as f:
+            f.write(cf_contents)
 
     env = base_env
     env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = '1'
index 060ebeb3a47cda01b55f3d3f139030151dba523e..9f6b2f001f019faab2327d9a65048868e8aab7e8 100644 (file)
@@ -23,8 +23,6 @@ import unittest
 # - Add a subprocesstest.SkipUnlessCapture decorator?
 # - Try to catch crashes? See the comments below in waitProcess.
 
-# XXX This should probably be in config.py and settable from
-# the command line.
 process_timeout = 300 # Seconds
 
 def cat_dhcp_command(mode):
@@ -33,7 +31,8 @@ def cat_dhcp_command(mode):
     sd_cmd = ''
     if sys.executable:
         sd_cmd = '"{}" '.format(sys.executable)
-    sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
+    this_dir = os.path.dirname(__file__)
+    sd_cmd += os.path.join(this_dir, 'util_dump_dhcp_pcap.py ' + mode)
     return sd_cmd
 
 class LoggingPopen(subprocess.Popen):
@@ -256,9 +255,6 @@ class SubprocessTestCase(unittest.TestCase):
             # fixture (via a test method parameter or class decorator).
             assert not (env is None and hasattr(self, '_fixture_request')), \
                 "Decorate class with @fixtures.mark_usefixtures('test_env')"
-        if env is None:
-            # Avoid using the test user's real environment by default.
-            env = config.test_env
         proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd)
         self.processes.append(proc)
         return proc
index a9ac1aa9c0cde6d9f810e3f1aee002c9deb9d218..1eaf4ad621a84feb32aca60a44ca90b2fb89bf03 100644 (file)
@@ -9,7 +9,6 @@
 #
 '''Capture tests'''
 
-import config
 import glob
 import os
 import subprocess
@@ -24,20 +23,48 @@ capture_duration = 5
 testout_pcap = 'testout.pcap'
 snapshot_len = 96
 
-def start_pinging(self):
-    ping_procs = []
+@fixtures.fixture
+def traffic_generator():
+    '''
+    Traffic generator factory. Invoking it returns a tuple (start_func, cfilter)
+    where cfilter is a capture filter to match the generated traffic.
+    start_func can be invoked to start generating traffic and returns a function
+    which can be used to stop traffic generation early.
+    Currently calls ping www.wireshark.org for 60 seconds.
+    '''
+    # XXX replace this by something that generates UDP traffic to localhost?
+    # That would avoid external access which is forbidden by the Debian policy.
+    nprocs = 1
     if sys.platform.startswith('win32'):
-        # Fake '-i' with a subsecond interval.
-        for st in (0.1, 0.1, 0):
-            ping_procs.append(self.startProcess(config.args_ping))
-            time.sleep(st)
+        # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping
+        args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org')
+        nprocs = 3
+    elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
+        args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org')
+    elif sys.platform.startswith('darwin'):
+        args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org')
     else:
-        ping_procs.append(self.startProcess(config.args_ping))
-    return ping_procs
-
-def stop_pinging(ping_procs):
-    for proc in ping_procs:
-        proc.kill()
+        # XXX Other BSDs, Solaris, etc
+        fixtures.skip('ping utility is unavailable - cannot generate traffic')
+    procs = []
+    def kill_processes():
+        for proc in procs:
+            proc.kill()
+        for proc in procs:
+            proc.wait()
+        procs.clear()
+    def start_processes():
+        for i in range(nprocs):
+            if i > 0:
+                # Fake subsecond interval if the ping utility lacks support.
+                time.sleep(0.1)
+            proc = subprocess.Popen(args_ping, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
+            procs.append(proc)
+        return kill_processes
+    try:
+        yield start_processes, 'icmp || icmp6'
+    finally:
+        kill_processes()
 
 
 @fixtures.fixture(scope='session')
@@ -54,14 +81,12 @@ def capture_command(*cmd_args, shell=False):
 
 
 @fixtures.fixture
-def check_capture_10_packets(capture_interface, cmd_dumpcap):
+def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator):
+    start_traffic, cfilter = traffic_generator
     def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
-        # Similar to suite_io.check_io_4_packets.
-        if not config.args_ping:
-            self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
         self.assertIsNotNone(cmd)
         testout_file = self.filename_from_id(testout_pcap)
-        ping_procs = start_pinging(self)
+        stop_traffic = start_traffic()
         if to_stdout:
             capture_proc = self.runProcess(capture_command(cmd,
                 '-i', '"{}"'.format(capture_interface),
@@ -69,7 +94,7 @@ def check_capture_10_packets(capture_interface, cmd_dumpcap):
                 '-w', '-',
                 '-c', '10',
                 '-a', 'duration:{}'.format(capture_duration),
-                '-f', '"icmp || icmp6"',
+                '-f', '"{}"'.format(cfilter),
                 '>', testout_file,
                 shell=True
             ),
@@ -82,10 +107,10 @@ def check_capture_10_packets(capture_interface, cmd_dumpcap):
                 '-w', testout_file,
                 '-c', '10',
                 '-a', 'duration:{}'.format(capture_duration),
-                '-f', 'icmp || icmp6',
+                '-f', cfilter,
             ))
+        stop_traffic()
         capture_returncode = capture_proc.returncode
-        stop_pinging(ping_procs)
         if capture_returncode != 0:
             self.log_fd.write('{} -D output:\n'.format(cmd))
             self.runProcess((cmd, '-D'))
@@ -160,13 +185,12 @@ def check_capture_stdin(cmd_dumpcap):
 
 
 @fixtures.fixture
-def check_capture_read_filter(capture_interface):
+def check_capture_read_filter(capture_interface, traffic_generator):
+    start_traffic, cfilter = traffic_generator
     def check_capture_read_filter_real(self, cmd=None):
-        if not config.args_ping:
-            self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
         self.assertIsNotNone(cmd)
-        ping_procs = start_pinging(self)
         testout_file = self.filename_from_id(testout_pcap)
+        stop_traffic = start_traffic()
         capture_proc = self.runProcess(capture_command(cmd,
             '-i', capture_interface,
             '-p',
@@ -175,10 +199,10 @@ def check_capture_read_filter(capture_interface):
             '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
             '-c', '10',
             '-a', 'duration:{}'.format(capture_duration),
-            '-f', 'icmp || icmp6',
+            '-f', cfilter,
         ))
+        stop_traffic()
         capture_returncode = capture_proc.returncode
-        stop_pinging(ping_procs)
         self.assertEqual(capture_returncode, 0)
 
         if (capture_returncode == 0):
@@ -186,12 +210,11 @@ def check_capture_read_filter(capture_interface):
     return check_capture_read_filter_real
 
 @fixtures.fixture
-def check_capture_snapshot_len(capture_interface, cmd_tshark):
+def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator):
+    start_traffic, cfilter = traffic_generator
     def check_capture_snapshot_len_real(self, cmd=None):
-        if not config.args_ping:
-            self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
         self.assertIsNotNone(cmd)
-        ping_procs = start_pinging(self)
+        stop_traffic = start_traffic()
         testout_file = self.filename_from_id(testout_pcap)
         capture_proc = self.runProcess(capture_command(cmd,
             '-i', capture_interface,
@@ -199,10 +222,10 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark):
             '-w', testout_file,
             '-s', str(snapshot_len),
             '-a', 'duration:{}'.format(capture_duration),
-            '-f', 'icmp || icmp6',
+            '-f', cfilter,
         ))
+        stop_traffic()
         capture_returncode = capture_proc.returncode
-        stop_pinging(ping_procs)
         self.assertEqual(capture_returncode, 0)
         self.assertTrue(os.path.isfile(testout_file))