64cba38aecf9316d605bf8cf21b281d9f209a772
[metze/wireshark/wip.git] / test / suite_sharkd.py
1 #
2 # -*- coding: utf-8 -*-
3 # Wireshark tests
4 # By Gerald Combs <gerald@wireshark.org>
5 #
6 # SPDX-License-Identifier: GPL-2.0-or-later
7 #
8 '''sharkd tests'''
9
10 import config
11 import json
12 import os.path
13 import subprocess
14 import subprocesstest
15 import sys
16 import unittest
17
18 dhcp_pcap = os.path.join(config.capture_dir, 'dhcp.pcap')
19
20 class case_sharkd(subprocesstest.SubprocessTestCase):
21     def test_sharkd_hello_no_pcap(self):
22         '''sharkd hello message, no capture file'''
23         sharkd_proc = self.startProcess((config.cmd_sharkd, '-'),
24             stdin=subprocess.PIPE
25         )
26
27         sharkd_commands = b'{"req":"status"}\n'
28         sharkd_proc.stdin.write(sharkd_commands)
29         self.waitProcess(sharkd_proc)
30
31         self.assertEqual(self.countOutput('Hello in child.', count_stdout=False, count_stderr=True), 1, 'No hello message.')
32
33         try:
34             jdata = json.loads(sharkd_proc.stdout_str)
35             self.assertEqual(jdata['duration'], 0.0, 'Missing duration.')
36         except:
37             self.fail('Invalid JSON: "{}"'.format(sharkd_proc.stdout_str))
38
39     def test_sharkd_hello_dhcp_pcap(self):
40         '''sharkd hello message, simple capture file'''
41         sharkd_proc = self.startProcess((config.cmd_sharkd, '-'),
42             stdin=subprocess.PIPE
43         )
44
45         sharkd_commands = b'{"req":"load","file":'
46         sharkd_commands += json.dumps(dhcp_pcap).encode('utf8')
47         sharkd_commands += b'}\n'
48         sharkd_commands += b'{"req":"status"}\n'
49         sharkd_commands += b'{"req":"frames"}\n'
50
51         sharkd_proc.stdin.write(sharkd_commands)
52         self.waitProcess(sharkd_proc)
53
54         has_dhcp = False
55         for line in sharkd_proc.stdout_str.splitlines():
56             line = line.strip()
57             if not line: continue
58             try:
59                 jdata = json.loads(line)
60             except:
61                 self.fail('Invalid JSON for "{}"'.format(line))
62
63             try:
64                 if 'DHCP' in jdata[0]['c']:
65                     has_dhcp = True
66             except:
67                 pass
68
69         self.assertTrue(has_dhcp, 'Failed to find DHCP in JSON output')