test: convert capture tests to use fixtures, fix tests without dumpcap
authorPeter Wu <peter@lekensteyn.nl>
Thu, 15 Nov 2018 17:44:59 +0000 (18:44 +0100)
committerPeter Wu <peter@lekensteyn.nl>
Fri, 16 Nov 2018 13:55:28 +0000 (13:55 +0000)
Add a new --capture-interface option to pytest, similar to test.py. It
will grab some Ethernet interface on Windows. An empty value overrides
this and disables capture tests. Remove the test.py --enable-capture
option since that is implied by the --capture-interface option.

Port the `test.py --program-path` option to pytest and additionally make
the pytest look in the current working directory if neither WS_BIN_PATH
nor --program-path are specified. Drop config.setProgramPath, this
allows tests to be run even if not all binaries are available.

With all capture tests converted to fixtures, it is now possible to run
tests when Wireshark is not built with libpcap as tests that depend on
cmd_dumpcap (or capture_interface) will be skipped.

Bug: 14949
Change-Id: Ie802c07904936de4cd30a4c68b6a5139e6680fbd
Reviewed-on: https://code.wireshark.org/review/30656
Petri-Dish: Peter Wu <peter@lekensteyn.nl>
Tested-by: Petri Dish Buildbot
Reviewed-by: Peter Wu <peter@lekensteyn.nl>
test/conftest.py
test/fixtures.py
test/fixtures_ws.py
test/subprocesstest.py
test/suite_capture.py
test/suite_clopts.py
test/suite_fileformats.py
test/suite_io.py
test/suite_unittests.py
test/test.py

index a09e86c..c2646bf 100644 (file)
@@ -8,31 +8,34 @@
 #
 '''py.test configuration'''
 
-import os
-import sys
 import fixtures
-import config
 
-# XXX remove globals in config and create py.test-specific fixtures
-try:
-    _program_path = os.environ['WS_BIN_PATH']
-except KeyError:
-    print('Please set env var WS_BIN_PATH to the run directory with binaries')
-    sys.exit(1)
-if not config.setProgramPath(_program_path):
-    print('One or more required executables not found at {}\n'.format(_program_path))
-    sys.exit(1)
+def pytest_addoption(parser):
+    parser.addoption('--disable-capture', action='store_true',
+        help='Disable capture tests'
+    )
+    parser.addoption('--capture-interface',
+        help='Capture interface index or name.'
+    )
+    parser.addoption('--program-path', help='Path to Wireshark executables.')
+
+_all_test_groups = None
 
 # this is set only to please case_unittests.test_unit_ctest_coverage
 def pytest_collection_modifyitems(items):
     '''Find all test groups.'''
+    global _all_test_groups
     suites = []
     for item in items:
         name = item.nodeid.split("::")[0].replace(".py", "").replace("/", ".")
         if name not in suites:
             suites.append(name)
-    config.all_groups = list(sorted(suites))
+    _all_test_groups = sorted(suites)
 
 # Must enable pytest before importing fixtures_ws.
 fixtures.enable_pytest()
 from fixtures_ws import *
+
+@fixtures.fixture(scope='session')
+def all_test_groups():
+    return _all_test_groups
index 26c0ed3..1dbfe40 100644 (file)
@@ -7,6 +7,7 @@
 # SPDX-License-Identifier: (GPL-2.0-or-later OR MIT)
 #
 
+import argparse
 import functools
 import inspect
 import sys
@@ -197,10 +198,13 @@ class _ExecutionScope(object):
 
     def cached_result(self, spec):
         '''Obtain the cached result for a previously executed fixture.'''
-        value, exc = self._find_scope(spec.scope).cache[spec.name]
+        entry = self._find_scope(spec.scope).cache.get(spec.name)
+        if not entry:
+            return None, False
+        value, exc = entry
         if exc:
             raise exc
-        return value
+        return value, True
 
     def _execute_one(self, spec, test_fn):
         # A fixture can only execute in the same or earlier scopes
@@ -265,7 +269,15 @@ class _FixtureRequest(object):
         if not spec:
             assert argname == 'request'
             return self
-        return self._context.cached_result(spec)
+        value, ok = self._context.cached_result(spec)
+        if not ok:
+            # If getfixturevalue is called directly from a setUp function, the
+            # fixture value might not have computed before, so evaluate it now.
+            # As the test function is not available, use None.
+            self._context.execute(spec, test_fn=None)
+            value, ok = self._context.cached_result(spec)
+            assert ok, 'Failed to execute fixture %s' % (spec,)
+        return value
 
     def destroy(self):
         self._context.destroy()
@@ -277,6 +289,11 @@ class _FixtureRequest(object):
     def instance(self):
         return self.function.__self__
 
+    @property
+    def config(self):
+        '''The pytest config object associated with this request.'''
+        return _config
+
 
 def _patch_unittest_testcase_class(cls):
     '''
@@ -304,8 +321,20 @@ def _patch_unittest_testcase_class(cls):
     cls._orig_tearDown, cls.tearDown = cls.tearDown, tearDown
 
 
+class _Config(object):
+    def __init__(self, args):
+        assert isinstance(args, argparse.Namespace)
+        self.args = args
+
+    def getoption(self, name, default):
+        '''Partial emulation for pytest Config.getoption.'''
+        name = name.lstrip('-').replace('-', '_')
+        return getattr(self.args, name, default)
+
+
 _fallback = None
 _session_context = None
+_config = None
 
 
 def init_fallback_fixtures_once():
@@ -317,10 +346,14 @@ def init_fallback_fixtures_once():
     # Register standard fixtures here as needed
 
 
-def create_session():
-    global _session_context
+def create_session(args=None):
+    '''Start a test session where args is from argparse.'''
+    global _session_context, _config
     assert not _use_native_pytest
     _session_context = _ExecutionScope('session', None)
+    if args is None:
+        args = argparse.Namespace()
+    _config = _Config(args)
 
 
 def destroy_session():
index 18be136..55e3e21 100644 (file)
@@ -22,9 +22,51 @@ import subprocesstest
 
 
 @fixtures.fixture(scope='session')
-def program_path():
-    # XXX stop using config
-    return config.program_path
+def capture_interface(request, cmd_dumpcap):
+    '''
+    Name of capture interface. Tests will be skipped if dumpcap is not
+    available or if the capture interface is unknown.
+    '''
+    iface = request.config.getoption('--capture-interface', default=None)
+    disabled = request.config.getoption('--disable-capture', default=False)
+    if disabled:
+        fixtures.skip('Capture tests are disabled via --disable-capture')
+    if iface:
+        # If a non-empty interface is given, assume capturing is possible.
+        return iface
+    else:
+        if sys.platform == 'win32':
+            patterns = '.*(Ethernet|Network Connection|VMware|Intel)'
+        else:
+            patterns = None
+        if patterns:
+            try:
+                output = subprocess.check_output((cmd_dumpcap, '-D'),
+                                                 stderr=subprocess.DEVNULL,
+                                                 universal_newlines=True)
+                m = re.search(r'^(\d+)\. %s' % patterns, output, re.MULTILINE)
+                if m:
+                    return m.group(1)
+            except subprocess.CalledProcessError:
+                pass
+    fixtures.skip('Test requires capture privileges and an interface.')
+
+
+@fixtures.fixture(scope='session')
+def program_path(request):
+    '''
+    Path to the Wireshark binaries as set by the --program-path option, the
+    WS_BIN_PATH environment variable or (curdir)/run.
+    '''
+    paths = (
+        request.config.getoption('--program-path', default=None),
+        os.environ.get('WS_BIN_PATH'),
+        os.path.join(os.curdir, 'run'),
+    )
+    for path in paths:
+        if type(path) == str and os.path.isdir(path):
+            return path
+    raise AssertionError('Missing directory with Wireshark binaries')
 
 
 @fixtures.fixture(scope='session')
@@ -75,6 +117,14 @@ def cmd_wireshark(program):
     return program('wireshark')
 
 
+@fixtures.fixture(scope='session')
+def wireshark_command(cmd_wireshark):
+    if sys.platform not in ('win32', 'darwin'):
+        # TODO check DISPLAY for X11 on Linux or BSD?
+        fixtures.skip('Wireshark GUI tests requires DISPLAY')
+    return (cmd_wireshark, '-ogui.update.enabled:FALSE')
+
+
 @fixtures.fixture(scope='session')
 def features(cmd_tshark):
     '''Returns an object describing available features in tshark.'''
@@ -182,5 +232,3 @@ def test_env(base_env, conf_path, request):
         # Inject the test environment as default if it was not overridden.
         request.instance.injected_test_env = env
     return env
-
-# XXX capture: capture_interface
index 9118686..060ebeb 100644 (file)
@@ -27,23 +27,6 @@ import unittest
 # the command line.
 process_timeout = 300 # Seconds
 
-def capture_command(cmd, *args, **kwargs):
-    '''Convert the supplied arguments into a command suitable for SubprocessTestCase.
-
-    If shell is true, return a string. Otherwise, return a list of arguments.'''
-    shell = kwargs.pop('shell', False)
-    if shell:
-        cap_cmd = ['"' + cmd + '"']
-    else:
-        cap_cmd = [cmd]
-    if cmd == config.cmd_wireshark:
-        cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
-    cap_cmd += args
-    if shell:
-        return ' '.join(cap_cmd)
-    else:
-        return cap_cmd
-
 def cat_dhcp_command(mode):
     '''Create a command string for dumping dhcp.pcap to stdout'''
     # XXX Do this in Python in a thread?
@@ -196,10 +179,12 @@ class SubprocessTestCase(unittest.TestCase):
 
         capinfos_args must be a sequence.
         Default cap_file is <test id>.testout.pcap.'''
+        # XXX convert users to use a new fixture instead of this function.
+        cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos')
         if not cap_file:
             cap_file = self.filename_from_id('testout.pcap')
-        self.log_fd.write('\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
-        capinfos_cmd = [config.cmd_capinfos]
+        self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file))
+        capinfos_cmd = [cmd_capinfos]
         if capinfos_args is not None:
             capinfos_cmd += capinfos_args
         capinfos_cmd.append(cap_file)
index eb1a4ff..a9ac1aa 100644 (file)
 import config
 import glob
 import os
-import re
 import subprocess
 import subprocesstest
 import sys
 import time
-import unittest
 import uuid
+import fixtures
 
 capture_duration = 5
 
@@ -40,331 +39,365 @@ def stop_pinging(ping_procs):
     for proc in ping_procs:
         proc.kill()
 
-def check_capture_10_packets(self, cmd=None, to_stdout=False):
-    # Similar to suite_io.check_io_4_packets.
-    if not config.canCapture():
-        self.skipTest('Test requires capture privileges and an interface.')
-    if cmd == config.cmd_wireshark and not config.canDisplay():
-        self.skipTest('Test requires a display.')
-    if not config.args_ping:
-        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
-    self.assertIsNotNone(cmd)
-    testout_file = self.filename_from_id(testout_pcap)
-    ping_procs = start_pinging(self)
-    if to_stdout:
-        capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
-            '-i', '"{}"'.format(config.capture_interface),
+
+@fixtures.fixture(scope='session')
+def wireshark_k(wireshark_command):
+    return tuple(list(wireshark_command) + ['-k'])
+
+def capture_command(*cmd_args, shell=False):
+    if type(cmd_args[0]) != str:
+        # Assume something like ['wireshark', '-k']
+        cmd_args = list(cmd_args[0]) + list(cmd_args)[1:]
+    if shell:
+        cmd_args = ' '.join(cmd_args)
+    return cmd_args
+
+
+@fixtures.fixture
+def check_capture_10_packets(capture_interface, cmd_dumpcap):
+    def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
+        # Similar to suite_io.check_io_4_packets.
+        if not config.args_ping:
+            self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+        self.assertIsNotNone(cmd)
+        testout_file = self.filename_from_id(testout_pcap)
+        ping_procs = start_pinging(self)
+        if to_stdout:
+            capture_proc = self.runProcess(capture_command(cmd,
+                '-i', '"{}"'.format(capture_interface),
+                '-p',
+                '-w', '-',
+                '-c', '10',
+                '-a', 'duration:{}'.format(capture_duration),
+                '-f', '"icmp || icmp6"',
+                '>', testout_file,
+                shell=True
+            ),
+            shell=True
+            )
+        else:
+            capture_proc = self.runProcess(capture_command(cmd,
+                '-i', capture_interface,
+                '-p',
+                '-w', testout_file,
+                '-c', '10',
+                '-a', 'duration:{}'.format(capture_duration),
+                '-f', 'icmp || icmp6',
+            ))
+        capture_returncode = capture_proc.returncode
+        stop_pinging(ping_procs)
+        if capture_returncode != 0:
+            self.log_fd.write('{} -D output:\n'.format(cmd))
+            self.runProcess((cmd, '-D'))
+        self.assertEqual(capture_returncode, 0)
+        if (capture_returncode == 0):
+            self.checkPacketCount(10)
+    return check_capture_10_packets_real
+
+
+@fixtures.fixture
+def check_capture_fifo(cmd_dumpcap):
+    if sys.platform == 'win32':
+        fixtures.skip('Test requires OS fifo support.')
+
+    def check_capture_fifo_real(self, cmd=None):
+        self.assertIsNotNone(cmd)
+        testout_file = self.filename_from_id(testout_pcap)
+        fifo_file = self.filename_from_id('testout.fifo')
+        try:
+            # If a previous test left its fifo laying around, e.g. from a failure, remove it.
+            os.unlink(fifo_file)
+        except:
+            pass
+        os.mkfifo(fifo_file)
+        slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
+        fifo_proc = self.startProcess(
+            ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
+            shell=True)
+        capture_proc = self.runProcess(capture_command(cmd,
+            '-i', fifo_file,
             '-p',
-            '-w', '-',
-            '-c', '10',
+            '-w', testout_file,
+            '-a', 'duration:{}'.format(capture_duration),
+        ))
+        fifo_proc.kill()
+        self.assertTrue(os.path.isfile(testout_file))
+        capture_returncode = capture_proc.returncode
+        self.assertEqual(capture_returncode, 0)
+        if (capture_returncode == 0):
+            self.checkPacketCount(8)
+    return check_capture_fifo_real
+
+
+@fixtures.fixture
+def check_capture_stdin(cmd_dumpcap):
+    # Capturing always requires dumpcap, hence the dependency on it.
+    def check_capture_stdin_real(self, cmd=None):
+        # Similar to suite_io.check_io_4_packets.
+        self.assertIsNotNone(cmd)
+        testout_file = self.filename_from_id(testout_pcap)
+        slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
+        capture_cmd = capture_command(cmd,
+            '-i', '-',
+            '-w', testout_file,
             '-a', 'duration:{}'.format(capture_duration),
-            '-f', '"icmp || icmp6"',
-            '>', testout_file,
             shell=True
-        ),
-        shell=True
         )
-    else:
-        capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
-            '-i', config.capture_interface,
+        is_gui = type(cmd) != str and '-k' in cmd[0]
+        if is_gui:
+            capture_cmd += ' -o console.log.level:127'
+        pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+        pipe_returncode = pipe_proc.returncode
+        self.assertEqual(pipe_returncode, 0)
+        if is_gui:
+            self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
+            self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
+            self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
+        self.assertTrue(os.path.isfile(testout_file))
+        if (pipe_returncode == 0):
+            self.checkPacketCount(8)
+    return check_capture_stdin_real
+
+
+@fixtures.fixture
+def check_capture_read_filter(capture_interface):
+    def check_capture_read_filter_real(self, cmd=None):
+        if not config.args_ping:
+            self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+        self.assertIsNotNone(cmd)
+        ping_procs = start_pinging(self)
+        testout_file = self.filename_from_id(testout_pcap)
+        capture_proc = self.runProcess(capture_command(cmd,
+            '-i', capture_interface,
             '-p',
             '-w', testout_file,
+            '-2',
+            '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
             '-c', '10',
             '-a', 'duration:{}'.format(capture_duration),
             '-f', 'icmp || icmp6',
         ))
-    capture_returncode = capture_proc.returncode
-    stop_pinging(ping_procs)
-    if capture_returncode != 0:
-        self.log_fd.write('{} -D output:\n'.format(cmd))
-        self.runProcess((cmd, '-D'))
-    self.assertEqual(capture_returncode, 0)
-    if (capture_returncode == 0):
-        self.checkPacketCount(10)
-
-def check_capture_fifo(self, cmd=None):
-    if not config.canMkfifo():
-        self.skipTest('Test requires OS fifo support.')
-    if cmd == config.cmd_wireshark and not config.canDisplay():
-        self.skipTest('Test requires a display.')
-    self.assertIsNotNone(cmd)
-    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
-    testout_file = self.filename_from_id(testout_pcap)
-    fifo_file = self.filename_from_id('testout.fifo')
-    try:
-        # If a previous test left its fifo laying around, e.g. from a failure, remove it.
-        os.unlink(fifo_file)
-    except:
-        pass
-    os.mkfifo(fifo_file)
-    slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
-    fifo_proc = self.startProcess(
-        ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
-        shell=True)
-    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
-        '-i', fifo_file,
-        '-p',
-        '-w', testout_file,
-        '-a', 'duration:{}'.format(capture_duration),
-    ))
-    fifo_proc.kill()
-    self.assertTrue(os.path.isfile(testout_file))
-    capture_returncode = capture_proc.returncode
-    self.assertEqual(capture_returncode, 0)
-    if (capture_returncode == 0):
-        self.checkPacketCount(8)
-
-def check_capture_stdin(self, cmd=None):
-    # Similar to suite_io.check_io_4_packets.
-    if cmd == config.cmd_wireshark and not config.canDisplay():
-        self.skipTest('Test requires a display.')
-    self.assertIsNotNone(cmd)
-    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
-    testout_file = self.filename_from_id(testout_pcap)
-    slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
-    capture_cmd = subprocesstest.capture_command(cmd,
-        '-i', '-',
-        '-w', testout_file,
-        '-a', 'duration:{}'.format(capture_duration),
-        shell=True
-    )
-    if cmd == config.cmd_wireshark:
-        capture_cmd += ' -o console.log.level:127'
-    pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
-    pipe_returncode = pipe_proc.returncode
-    self.assertEqual(pipe_returncode, 0)
-    if cmd == config.cmd_wireshark:
-        self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
-        self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
-        self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
-    self.assertTrue(os.path.isfile(testout_file))
-    if (pipe_returncode == 0):
-        self.checkPacketCount(8)
-
-def check_capture_read_filter(self, cmd=None):
-    if not config.canCapture():
-        self.skipTest('Test requires capture privileges and an interface.')
-    if cmd == config.cmd_wireshark and not config.canDisplay():
-        self.skipTest('Test requires a display.')
-    if not config.args_ping:
-        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
-    self.assertIsNotNone(cmd)
-    ping_procs = start_pinging(self)
-    testout_file = self.filename_from_id(testout_pcap)
-    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
-        '-i', config.capture_interface,
-        '-p',
-        '-w', testout_file,
-        '-2',
-        '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
-        '-c', '10',
-        '-a', 'duration:{}'.format(capture_duration),
-        '-f', 'icmp || icmp6',
-    ))
-    capture_returncode = capture_proc.returncode
-    stop_pinging(ping_procs)
-    self.assertEqual(capture_returncode, 0)
-
-    if (capture_returncode == 0):
-        self.checkPacketCount(0)
-
-def check_capture_snapshot_len(self, cmd=None):
-    if not config.canCapture():
-        self.skipTest('Test requires capture privileges and an interface.')
-    if cmd == config.cmd_wireshark and not config.canDisplay():
-        self.skipTest('Test requires a display.')
-    if not config.args_ping:
-        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
-    self.assertIsNotNone(cmd)
-    ping_procs = start_pinging(self)
-    testout_file = self.filename_from_id(testout_pcap)
-    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
-        '-i', config.capture_interface,
-        '-p',
-        '-w', testout_file,
-        '-s', str(snapshot_len),
-        '-a', 'duration:{}'.format(capture_duration),
-        '-f', 'icmp || icmp6',
-    ))
-    capture_returncode = capture_proc.returncode
-    stop_pinging(ping_procs)
-    self.assertEqual(capture_returncode, 0)
-    self.assertTrue(os.path.isfile(testout_file))
-
-    # Use tshark to filter out all packets larger than 68 bytes.
-    testout2_file = self.filename_from_id('testout2.pcap')
-
-    filter_proc = self.runProcess((config.cmd_tshark,
-        '-r', testout_file,
-        '-w', testout2_file,
-        '-Y', 'frame.cap_len>{}'.format(snapshot_len),
-    ))
-    filter_returncode = filter_proc.returncode
-    self.assertEqual(capture_returncode, 0)
-    if (capture_returncode == 0):
-        self.checkPacketCount(0, cap_file=testout2_file)
-
-def check_dumpcap_autostop_stdin(self, packets=None, filesize=None):
-    # Similar to check_capture_stdin.
-    cmd = config.cmd_dumpcap
-    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
-    testout_file = self.filename_from_id(testout_pcap)
-    cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
-    condition='oops:invalid'
-
-    self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
-    self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
-
-    if packets is not None:
-        condition = 'packets:{}'.format(packets)
-    elif filesize is not None:
-        condition = 'filesize:{}'.format(filesize)
-
-    capture_cmd = subprocesstest.capture_command(cmd,
-        '-i', '-',
-        '-w', testout_file,
-        '-a', condition,
-        shell=True
-    )
-    pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
-    pipe_returncode = pipe_proc.returncode
-    self.assertEqual(pipe_returncode, 0)
-    self.assertTrue(os.path.isfile(testout_file))
-    if (pipe_returncode != 0):
-        return
-
-    if packets is not None:
-        self.checkPacketCount(packets)
-    elif filesize is not None:
-        capturekb = os.path.getsize(testout_file) / 1000
-        self.assertGreaterEqual(capturekb, filesize)
-
-def check_dumpcap_ringbuffer_stdin(self, packets=None, filesize=None):
-    # Similar to check_capture_stdin.
-    cmd = config.cmd_dumpcap
-    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
-    rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
-    testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
-    testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
-    cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
-    condition='oops:invalid'
-
-    self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
-    self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
-
-    if packets is not None:
-        condition = 'packets:{}'.format(packets)
-    elif filesize is not None:
-        condition = 'filesize:{}'.format(filesize)
-
-    capture_cmd = subprocesstest.capture_command(cmd,
-        '-i', '-',
-        '-w', testout_file,
-        '-a', 'files:2',
-        '-b', condition,
-        shell=True
-    )
-    pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
-    pipe_returncode = pipe_proc.returncode
-    self.assertEqual(pipe_returncode, 0)
-    if (pipe_returncode != 0):
-        return
-
-    rb_files = glob.glob(testout_glob)
-    for rbf in rb_files:
-        self.cleanup_files.append(rbf)
-
-    self.assertEqual(len(rb_files), 2)
-
-    for rbf in rb_files:
-        self.assertTrue(os.path.isfile(rbf))
+        capture_returncode = capture_proc.returncode
+        stop_pinging(ping_procs)
+        self.assertEqual(capture_returncode, 0)
+
+        if (capture_returncode == 0):
+            self.checkPacketCount(0)
+    return check_capture_read_filter_real
+
+@fixtures.fixture
+def check_capture_snapshot_len(capture_interface, cmd_tshark):
+    def check_capture_snapshot_len_real(self, cmd=None):
+        if not config.args_ping:
+            self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+        self.assertIsNotNone(cmd)
+        ping_procs = start_pinging(self)
+        testout_file = self.filename_from_id(testout_pcap)
+        capture_proc = self.runProcess(capture_command(cmd,
+            '-i', capture_interface,
+            '-p',
+            '-w', testout_file,
+            '-s', str(snapshot_len),
+            '-a', 'duration:{}'.format(capture_duration),
+            '-f', 'icmp || icmp6',
+        ))
+        capture_returncode = capture_proc.returncode
+        stop_pinging(ping_procs)
+        self.assertEqual(capture_returncode, 0)
+        self.assertTrue(os.path.isfile(testout_file))
+
+        # Use tshark to filter out all packets larger than 68 bytes.
+        testout2_file = self.filename_from_id('testout2.pcap')
+
+        filter_proc = self.runProcess((cmd_tshark,
+            '-r', testout_file,
+            '-w', testout2_file,
+            '-Y', 'frame.cap_len>{}'.format(snapshot_len),
+        ))
+        filter_returncode = filter_proc.returncode
+        self.assertEqual(capture_returncode, 0)
+        if (capture_returncode == 0):
+            self.checkPacketCount(0, cap_file=testout2_file)
+    return check_capture_snapshot_len_real
+
+
+@fixtures.fixture
+def check_dumpcap_autostop_stdin(cmd_dumpcap):
+    def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None):
+        # Similar to check_capture_stdin.
+        testout_file = self.filename_from_id(testout_pcap)
+        cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
+        condition='oops:invalid'
+
+        self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
+        self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
+
+        if packets is not None:
+            condition = 'packets:{}'.format(packets)
+        elif filesize is not None:
+            condition = 'filesize:{}'.format(filesize)
+
+        capture_cmd = ' '.join((cmd_dumpcap,
+            '-i', '-',
+            '-w', testout_file,
+            '-a', condition,
+        ))
+        pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+        pipe_returncode = pipe_proc.returncode
+        self.assertEqual(pipe_returncode, 0)
+        self.assertTrue(os.path.isfile(testout_file))
+        if (pipe_returncode != 0):
+            return
+
         if packets is not None:
-            self.checkPacketCount(packets, cap_file=rbf)
+            self.checkPacketCount(packets)
         elif filesize is not None:
-            capturekb = os.path.getsize(rbf) / 1000
+            capturekb = os.path.getsize(testout_file) / 1000
             self.assertGreaterEqual(capturekb, filesize)
+    return check_dumpcap_autostop_stdin_real
+
+
+@fixtures.fixture
+def check_dumpcap_ringbuffer_stdin(cmd_dumpcap):
+    def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None):
+        # Similar to check_capture_stdin.
+        rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
+        testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
+        testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
+        cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
+        condition='oops:invalid'
+
+        self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
+        self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
 
+        if packets is not None:
+            condition = 'packets:{}'.format(packets)
+        elif filesize is not None:
+            condition = 'filesize:{}'.format(filesize)
+
+        capture_cmd = ' '.join((cmd_dumpcap,
+            '-i', '-',
+            '-w', testout_file,
+            '-a', 'files:2',
+            '-b', condition,
+        ))
+        pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+        pipe_returncode = pipe_proc.returncode
+        self.assertEqual(pipe_returncode, 0)
+        if (pipe_returncode != 0):
+            return
+
+        rb_files = glob.glob(testout_glob)
+        for rbf in rb_files:
+            self.cleanup_files.append(rbf)
+
+        self.assertEqual(len(rb_files), 2)
+
+        for rbf in rb_files:
+            self.assertTrue(os.path.isfile(rbf))
+            if packets is not None:
+                self.checkPacketCount(packets, cap_file=rbf)
+            elif filesize is not None:
+                capturekb = os.path.getsize(rbf) / 1000
+                self.assertGreaterEqual(capturekb, filesize)
+    return check_dumpcap_ringbuffer_stdin_real
+
+
+@fixtures.mark_usefixtures('test_env')
+@fixtures.uses_fixtures
 class case_wireshark_capture(subprocesstest.SubprocessTestCase):
-    def test_wireshark_capture_10_packets_to_file(self):
+    def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets):
         '''Capture 10 packets from the network to a file using Wireshark'''
-        check_capture_10_packets(self, cmd=config.cmd_wireshark)
+        check_capture_10_packets(self, cmd=wireshark_k)
 
     # Wireshark doesn't currently support writing to stdout while capturing.
-    # def test_wireshark_capture_10_packets_to_stdout(self):
+    # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
     #     '''Capture 10 packets from the network to stdout using Wireshark'''
-    #     check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True)
+    #     check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
 
-    def test_wireshark_capture_from_fifo(self):
+    def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo):
         '''Capture from a fifo using Wireshark'''
-        check_capture_fifo(self, cmd=config.cmd_wireshark)
+        check_capture_fifo(self, cmd=wireshark_k)
 
-    def test_wireshark_capture_from_stdin(self):
+    def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin):
         '''Capture from stdin using Wireshark'''
-        check_capture_stdin(self, cmd=config.cmd_wireshark)
+        check_capture_stdin(self, cmd=wireshark_k)
 
-    def test_wireshark_capture_snapshot_len(self):
+    def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len):
         '''Capture truncated packets using Wireshark'''
-        check_capture_snapshot_len(self, cmd=config.cmd_wireshark)
+        check_capture_snapshot_len(self, cmd=wireshark_k)
 
+
+@fixtures.mark_usefixtures('test_env')
+@fixtures.uses_fixtures
 class case_tshark_capture(subprocesstest.SubprocessTestCase):
-    def test_tshark_capture_10_packets_to_file(self):
+    def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets):
         '''Capture 10 packets from the network to a file using TShark'''
-        check_capture_10_packets(self, cmd=config.cmd_tshark)
+        check_capture_10_packets(self, cmd=cmd_tshark)
 
-    def test_tshark_capture_10_packets_to_stdout(self):
+    def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets):
         '''Capture 10 packets from the network to stdout using TShark'''
-        check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True)
+        check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True)
 
-    def test_tshark_capture_from_fifo(self):
+    def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo):
         '''Capture from a fifo using TShark'''
-        check_capture_fifo(self, cmd=config.cmd_tshark)
+        check_capture_fifo(self, cmd=cmd_tshark)
 
-    def test_tshark_capture_from_stdin(self):
+    def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin):
         '''Capture from stdin using TShark'''
-        check_capture_stdin(self, cmd=config.cmd_tshark)
+        check_capture_stdin(self, cmd=cmd_tshark)
 
-    def test_tshark_capture_snapshot_len(self):
+    def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len):
         '''Capture truncated packets using TShark'''
-        check_capture_snapshot_len(self, cmd=config.cmd_tshark)
+        check_capture_snapshot_len(self, cmd=cmd_tshark)
+
 
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
 class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
-    def test_dumpcap_capture_10_packets_to_file(self):
+    def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets):
         '''Capture 10 packets from the network to a file using Dumpcap'''
-        check_capture_10_packets(self, cmd=config.cmd_dumpcap)
+        check_capture_10_packets(self, cmd=cmd_dumpcap)
 
-    def test_dumpcap_capture_10_packets_to_stdout(self):
+    def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets):
         '''Capture 10 packets from the network to stdout using Dumpcap'''
-        check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True)
+        check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True)
 
-    def test_dumpcap_capture_from_fifo(self):
+    def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo):
         '''Capture from a fifo using Dumpcap'''
-        check_capture_fifo(self, cmd=config.cmd_dumpcap)
+        check_capture_fifo(self, cmd=cmd_dumpcap)
 
-    def test_dumpcap_capture_from_stdin(self):
+    def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin):
         '''Capture from stdin using Dumpcap'''
-        check_capture_stdin(self, cmd=config.cmd_dumpcap)
+        check_capture_stdin(self, cmd=cmd_dumpcap)
 
-    def test_dumpcap_capture_snapshot_len(self):
+    def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap):
         '''Capture truncated packets using Dumpcap'''
-        check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)
+        check_capture_snapshot_len(self, cmd=cmd_dumpcap)
 
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
 class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
     # duration, filesize, packets, files
-    def test_dumpcap_autostop_filesize(self):
+    def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin):
         '''Capture from stdin using Dumpcap until we reach a file size limit'''
         check_dumpcap_autostop_stdin(self, filesize=15)
 
-    def test_dumpcap_autostop_packets(self):
+    def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin):
         '''Capture from stdin using Dumpcap until we reach a packet limit'''
         check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
 
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
 class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
     # duration, interval, filesize, packets, files
     # Need a function that finds ringbuffer file names.
-    def test_dumpcap_ringbuffer_filesize(self):
+    def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin):
         '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
         check_dumpcap_ringbuffer_stdin(self, filesize=15)
 
-    def test_dumpcap_ringbuffer_packets(self):
+    def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin):
         '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
         check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.
index 9be3e13..722769b 100644 (file)
@@ -9,10 +9,8 @@
 #
 '''Command line option tests'''
 
-import config
 import subprocess
 import subprocesstest
-import unittest
 import fixtures
 
 #glossaries = ('fields', 'protocols', 'values', 'decodes', 'defaultprefs', 'currentprefs')
@@ -44,36 +42,31 @@ class case_dumpcap_options(subprocesstest.SubprocessTestCase):
             self.assertIn(process.returncode, valid_returns)
 
 
+@fixtures.mark_usefixtures('base_env')
 @fixtures.uses_fixtures
 class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
-    def test_dumpcap_invalid_capfilter(self, cmd_dumpcap, base_env):
+    def test_dumpcap_invalid_capfilter(self, cmd_dumpcap, capture_interface):
         '''Invalid capture filter'''
-        if not config.canCapture():
-            self.skipTest('Test requires capture privileges and an interface.')
         invalid_filter = '__invalid_protocol'
         # $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
         testout_file = self.filename_from_id(testout_pcap)
-        self.runProcess((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file), env=base_env)
+        self.runProcess((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file))
         self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
 
-    def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, base_env):
+    def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, capture_interface):
         '''Invalid capture interface name'''
-        if not config.canCapture():
-            self.skipTest('Test requires capture privileges and an interface.')
         invalid_interface = '__invalid_interface'
         # $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
         testout_file = self.filename_from_id(testout_pcap)
-        self.runProcess((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file), env=base_env)
+        self.runProcess((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file))
         self.assertTrue(self.grepOutput('The capture session could not be initiated'))
 
-    def test_dumpcap_invalid_interface_index(self, cmd_dumpcap, base_env):
+    def test_dumpcap_invalid_interface_index(self, cmd_dumpcap, capture_interface):
         '''Invalid capture interface index'''
-        if not config.canCapture():
-            self.skipTest('Test requires capture privileges and an interface.')
         invalid_index = '0'
         # $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1
         testout_file = self.filename_from_id(testout_pcap)
-        self.runProcess((cmd_dumpcap, '-i', invalid_index, '-w', testout_file), env=base_env)
+        self.runProcess((cmd_dumpcap, '-i', invalid_index, '-w', testout_file))
         self.assertTrue(self.grepOutput('There is no interface with that adapter index'))
 
 
@@ -106,8 +99,9 @@ class case_tshark_options(subprocesstest.SubprocessTestCase):
             self.assertRun((cmd_tshark, '-' + char_arg))
 
     # XXX Should we generate individual test functions instead of looping?
-    def test_tshark_interface_chars(self, cmd_tshark):
+    def test_tshark_interface_chars(self, cmd_tshark, cmd_dumpcap):
         '''Valid tshark parameters requiring capture permissions'''
+        # These options require dumpcap
         valid_returns = [self.exit_ok, self.exit_error]
         for char_arg in 'DL':
             process = self.runProcess((cmd_tshark, '-' + char_arg))
@@ -117,30 +111,24 @@ class case_tshark_options(subprocesstest.SubprocessTestCase):
 @fixtures.mark_usefixtures('test_env')
 @fixtures.uses_fixtures
 class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
-    def test_tshark_invalid_capfilter(self, cmd_tshark):
+    def test_tshark_invalid_capfilter(self, cmd_tshark, capture_interface):
         '''Invalid capture filter'''
-        if not config.canCapture():
-            self.skipTest('Test requires capture privileges and an interface.')
         invalid_filter = '__invalid_protocol'
         # $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
         testout_file = self.filename_from_id(testout_pcap)
         self.runProcess((cmd_tshark, '-f', invalid_filter, '-w', testout_file ))
         self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
 
-    def test_tshark_invalid_interface_name(self, cmd_tshark):
+    def test_tshark_invalid_interface_name(self, cmd_tshark, capture_interface):
         '''Invalid capture interface name'''
-        if not config.canCapture():
-            self.skipTest('Test requires capture privileges and an interface.')
         invalid_interface = '__invalid_interface'
         # $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
         testout_file = self.filename_from_id(testout_pcap)
         self.runProcess((cmd_tshark, '-i', invalid_interface, '-w', testout_file))
         self.assertTrue(self.grepOutput('The capture session could not be initiated'))
 
-    def test_tshark_invalid_interface_index(self, cmd_tshark):
+    def test_tshark_invalid_interface_index(self, cmd_tshark, capture_interface):
         '''Invalid capture interface index'''
-        if not config.canCapture():
-            self.skipTest('Test requires capture privileges and an interface.')
         invalid_index = '0'
         # $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1
         testout_file = self.filename_from_id(testout_pcap)
@@ -151,9 +139,7 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
 @fixtures.mark_usefixtures('test_env')
 @fixtures.uses_fixtures
 class case_tshark_name_resolution_clopts(subprocesstest.SubprocessTestCase):
-    def test_tshark_valid_name_resolution(self, cmd_tshark):
-        if not config.canCapture():
-            self.skipTest('Test requires capture privileges and an interface.')
+    def test_tshark_valid_name_resolution(self, cmd_tshark, capture_interface):
         # $TSHARK -N mnNtdv -a duration:1 > ./testout.txt 2>&1
         self.assertRun((cmd_tshark, '-N', 'mnNtdv', '-a', 'duration: 1'))
 
index 86be14d..1d482df 100644 (file)
@@ -34,29 +34,29 @@ def fileformats_baseline_str(dirs):
 class case_fileformat_pcap(subprocesstest.SubprocessTestCase):
     def test_pcap_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
         '''Microsecond pcap direct vs microsecond pcap stdin'''
-        capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+        capture_proc = self.runProcess(' '.join((cmd_tshark,
                 '-r', '-',
                 '-Tfields',
                 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
                 '<', capture_file('dhcp.pcap')
-                , shell=True),
+                )),
             shell=True)
         self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
 
     def test_pcap_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
         '''Microsecond pcap direct vs nanosecond pcap stdin'''
-        capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+        capture_proc = self.runProcess(' '.join((cmd_tshark,
                 '-r', '-',
                 '-Tfields',
                 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
                 '<', capture_file('dhcp-nanosecond.pcap')
-                , shell=True),
+                )),
             shell=True)
         self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
 
     def test_pcap_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
         '''Microsecond pcap direct vs nanosecond pcap direct'''
-        capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+        capture_proc = self.runProcess((cmd_tshark,
                 '-r', capture_file('dhcp-nanosecond.pcap'),
                 '-Tfields',
                 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
@@ -70,18 +70,18 @@ class case_fileformat_pcap(subprocesstest.SubprocessTestCase):
 class case_fileformat_pcapng(subprocesstest.SubprocessTestCase):
     def test_pcapng_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
         '''Microsecond pcap direct vs microsecond pcapng stdin'''
-        capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+        capture_proc = self.runProcess(' '.join((cmd_tshark,
                 '-r', '-',
                 '-Tfields',
                 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
                 '<', capture_file('dhcp.pcapng')
-                , shell=True),
+                )),
             shell=True)
         self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
 
     def test_pcapng_usec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
         '''Microsecond pcap direct vs microsecond pcapng direct'''
-        capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+        capture_proc = self.runProcess((cmd_tshark,
                 '-r', capture_file('dhcp.pcapng'),
                 '-Tfields',
                 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
@@ -91,18 +91,18 @@ class case_fileformat_pcapng(subprocesstest.SubprocessTestCase):
 
     def test_pcapng_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
         '''Microsecond pcap direct vs nanosecond pcapng stdin'''
-        capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+        capture_proc = self.runProcess(' '.join((cmd_tshark,
                 '-r', '-',
                 '-Tfields',
                 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
                 '<', capture_file('dhcp-nanosecond.pcapng')
-                , shell=True),
+                )),
             shell=True)
         self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
 
     def test_pcapng_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
         '''Microsecond pcap direct vs nanosecond pcapng direct'''
-        capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+        capture_proc = self.runProcess((cmd_tshark,
                 '-r', capture_file('dhcp-nanosecond.pcapng'),
                 '-Tfields',
                 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
index 9561b64..9b0fa9c 100644 (file)
@@ -47,7 +47,7 @@ def check_io_4_packets(self, capture_file, cmd=None, from_stdin=False, to_stdout
         io_proc = self.runProcess(stdout_cmd, shell=True)
     else: # direct->direct
         # $DUT -r "${CAPTURE_DIR}dhcp.pcap" -w ./testout.pcap > ./testout.txt 2>&1
-        io_proc = self.runProcess(subprocesstest.capture_command(cmd,
+        io_proc = self.runProcess((cmd,
             '-r', capture_file('dhcp.pcap'),
             '-w', testout_file,
         ))
index 94621ed..6200dcf 100644 (file)
@@ -11,7 +11,6 @@
 #
 '''EPAN unit tests'''
 
-import config
 import difflib
 import os.path
 import re
@@ -53,7 +52,7 @@ class case_unittests(subprocesstest.SubprocessTestCase):
         '''fieldcount'''
         self.assertRun((cmd_tshark, '-G', 'fieldcount'), env=test_env)
 
-    def test_unit_ctest_coverage(self):
+    def test_unit_ctest_coverage(self, all_test_groups):
         '''Make sure CTest runs all of our tests.'''
         with open(os.path.join(os.path.dirname(__file__), '..', 'CMakeLists.txt')) as cml_fd:
             group_re = re.compile(r'set *\( *_test_group_list')
@@ -68,8 +67,8 @@ class case_unittests(subprocesstest.SubprocessTestCase):
                         break
                     cml_groups.append(cml_line.strip())
         cml_groups.sort()
-        if not config.all_groups == cml_groups:
-            diff = '\n'.join(list(difflib.unified_diff(config.all_groups, cml_groups, 'all test groups', 'CMakeLists.txt test groups')))
+        if not all_test_groups == cml_groups:
+            diff = '\n'.join(list(difflib.unified_diff(all_test_groups, cml_groups, 'all test groups', 'CMakeLists.txt test groups')))
             self.fail("CMakeLists.txt doesn't test all available groups:\n" + diff)
 
 
index c81f14c..f775bae 100755 (executable)
 # To do:
 # - Avoid printing Python tracebacks when we assert? It looks like we'd need
 #   to override unittest.TextTestResult.addFailure().
-# - Remove BIN_PATH/hosts via config.tearDownHostFiles + case_name_resolution.tearDownClass?
 
 
 import argparse
-import config
 import os.path
 import sys
 import unittest
+import fixtures
 # Required to make fixtures available to tests!
 import fixtures_ws
 
+_all_test_groups = None
+
+@fixtures.fixture(scope='session')
+def all_test_groups():
+    return _all_test_groups
+
 def find_test_ids(suite, all_ids):
     if hasattr(suite, '__iter__'):
         for s in suite:
@@ -47,10 +52,9 @@ def main():
 
     parser = argparse.ArgumentParser(description='Wireshark unit tests')
     cap_group = parser.add_mutually_exclusive_group()
-    cap_group.add_argument('-e', '--enable-capture', action='store_true', help='Enable capture tests')
     cap_group.add_argument('-E', '--disable-capture', action='store_true', help='Disable capture tests')
-    cap_group.add_argument('-i', '--capture-interface', nargs=1, default=None, help='Capture interface index or name')
-    parser.add_argument('-p', '--program-path', nargs=1, default=os.path.curdir, help='Path to Wireshark executables.')
+    cap_group.add_argument('-i', '--capture-interface', help='Capture interface index or name')
+    parser.add_argument('-p', '--program-path', default=os.path.curdir, help='Path to Wireshark executables.')
     list_group = parser.add_mutually_exclusive_group()
     list_group.add_argument('-l', '--list', action='store_true', help='List tests. One of "all" or a full or partial test name.')
     list_group.add_argument('--list-suites', action='store_true', help='List all suites.')
@@ -60,14 +64,6 @@ def main():
     parser.add_argument('tests_to_run', nargs='*', metavar='test', default=['all'], help='Tests to run. One of "all" or a full or partial test name. Default is "all".')
     args = parser.parse_args()
 
-    if args.enable_capture:
-        config.setCanCapture(True)
-    elif args.disable_capture:
-        config.setCanCapture(False)
-
-    if args.capture_interface:
-        config.setCaptureInterface(args.capture_interface[0])
-
     all_tests = unittest.defaultTestLoader.discover(os.path.dirname(__file__), pattern='suite_*')
 
     all_ids = []
@@ -96,8 +92,7 @@ def main():
     for aid in all_ids:
         aparts = aid.split('.')
         all_suites |= {aparts[0]}
-    config.all_suites = list(all_suites)
-    config.all_suites.sort()
+    all_suites = sorted(all_suites)
 
     all_groups = set()
     for aid in all_ids:
@@ -106,15 +101,16 @@ def main():
             all_groups |= {'.'.join(aparts[:2])}
         else:
             all_groups |= {aparts[0]}
-    config.all_groups = list(all_groups)
-    config.all_groups.sort()
+    all_groups = sorted(all_groups)
+    global _all_test_groups
+    _all_test_groups = all_groups
 
     if args.list_suites:
-        print('\n'.join(config.all_suites))
+        print('\n'.join(all_suites))
         sys.exit(0)
 
     if args.list_groups:
-        print('\n'.join(config.all_groups))
+        print('\n'.join(all_groups))
         sys.exit(0)
 
     if args.list_cases:
@@ -125,13 +121,6 @@ def main():
         print('\n'.join(list(cases)))
         sys.exit(0)
 
-    program_path = args.program_path[0]
-    if not config.setProgramPath(program_path):
-        print('One or more required executables not found at {}\n'.format(program_path))
-        parser.print_usage()
-        sys.exit(1)
-
-    #
     if sys.stdout.encoding != 'UTF-8':
         import codecs
         import locale
@@ -142,7 +131,7 @@ def main():
     run_suite = unittest.defaultTestLoader.loadTestsFromNames(run_ids)
     runner = unittest.TextTestRunner(verbosity=args.verbose)
     # for unittest compatibility (not needed with pytest)
-    fixtures_ws.fixtures.create_session()
+    fixtures_ws.fixtures.create_session(args)
     try:
         test_result = runner.run(run_suite)
     finally: