+ def test_text2pcap_doc_no_line_limit(self, check_rawip):
+ '''
+ Verify: There is no limit on the width or number of bytes per line and
+ Bytes/hex numbers can be uppercase or lowercase.
+ '''
+ pdata = "0000 45 00 00 21 00 01 00 00 40 11\n" \
+ "000A 7C C9 7F 00 00 01" \
+ " 7f 00 00 01 ff 98 00 13 00 0d b5 48 66 69 72 73\n" \
+ "0020 74\n"
+ check_rawip(pdata, 1, 33)
+
+ def test_text2pcap_doc_ignore_text(self, check_rawip):
+ '''
+ Verify: the text dump at the end of the line is ignored. Any hex numbers
+ in this text are also ignored. Any lines of text between the bytestring
+ lines is ignored. Any line where the first non-whitespace character is
+ '#' will be ignored as a comment.
+ '''
+ pdata = "0000 45 00 00 21 00 01 00 00 40 11 7c c9 7f 00 00 01 bad\n" \
+ "0010 7f 00 00 01 ff 98 00 13 00 0d b5 48 66 69 72 73 - 42\n" \
+ "0020 74\n" \
+ "0021\n" \
+ "That 0021 should probably be ignored as it this: 00 20\n" \
+ "0000 45 00 00 22 00 01 00 00 40 11 7c c8 7f 00 00 01\n" \
+ "0010 7f 00 00 01 ff 99 00 13 00 0e bc e9 73 65 63 6f ...\n" \
+ " \t# 0020 12 34 56<-- comment, ignore this!\n" \
+ "0020 6e 64\n" \
+ "12 34 56 78 90 # ignore this due to missing offset!\n"
+ check_rawip(pdata, 2, 67)
+
+ def test_text2pcap_doc_leading_text_ignored(self, check_rawip):
+ '''
+ Verify: Any test before the offset is ignored, including email
+ forwarding characters '>'. An offset is a hex number longer than two
+ characters. An offset of zero is indicative of starting a new packet.
+ '''
+ pdata = "> >> 000 45 00 00 21 00 01 00 00 40 11 7c c9 7f 00 00 01\n" \
+ "> >> 010 7f 00 00 01 ff 98 00 13 00 0d b5 48 66 69 72 73\n" \
+ "> >> 020 74\n" \
+ "> >> 000 45 00 00 22 00 01 00 00 40 11 7c c8 7f 00 00 01\n" \
+ "> >> 010 7f 00 00 01 ff 99 00 13 00 0e bc e9 73 65 63 6f\n" \
+ "> >> 020 6e 64\n"
+ check_rawip(pdata, 2, 67)
+
+ def test_text2pcap_doc_require_offset(self, check_rawip):
+ '''Any line which has only bytes without a leading offset is ignored.'''
+ pdata = "45 00 00 21 00 01 00 00 40 11 7c c9 7f 00 00 01\n" \
+ "7f 00 00 01 ff 98 00 13 00 0d b5 48 66 69 72 73\n"
+ check_rawip(pdata, 0, 0)
+
+ def test_text2pcap_eol_missing(self, check_rawip):
+ '''Verify that the last LF can be missing.'''
+ pdata = "0000 45 00 00 21 00 01 00 00 40 11 7c c9 7f 00 00 01\n" \
+ "0010 7f 00 00 01 ff 98 00 13 00 0d b5 48 66 69 72 73\n" \
+ "0020 74"
+ check_rawip(pdata, 1, 33)
+
+
+@fixtures.fixture
+def run_text2pcap_capinfos_tshark(cmd_text2pcap, cmd_tshark, request):
+ def run_text2pcap_capinfos_tshark_real(content, args):
+ test = request.instance
+ testin_file = test.filename_from_id(testin_txt)
+ testout_file = test.filename_from_id(testout_pcap)
+
+ with open(testin_file, "w") as f:
+ f.write(content)
+ test.assertRun((cmd_text2pcap,) + args + (testin_file, testout_file))
+
+ capinfo = get_capinfos_cmp_info(check_capinfos_info(test, testout_file))
+
+ test.assertRun((cmd_tshark, '-q', '-z', 'expert,warn',
+ '-o', 'udp.check_checksum: TRUE',
+ '-o', 'tcp.check_checksum: TRUE',
+ '-o', 'sctp.checksum:TRUE',
+ '-r', testout_file))
+ capinfo['expert'] = test.processes[-1].stdout_str
+ return capinfo
+ return run_text2pcap_capinfos_tshark_real
+
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
+class case_text2pcap_headers(subprocesstest.SubprocessTestCase):
+ '''Test TCP, UDP or SCTP header without -4 or -6 option'''
+ maxDiff = None
+
+ def test_text2pcap_tcp(self, run_text2pcap_capinfos_tshark):
+ '''Test TCP over IPv4'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 60, 'expert': ''},
+ run_text2pcap_capinfos_tshark(
+ "0000: ff ff ff ff\n", ("-T", "1234,1234")))
+
+ def test_text2pcap_udp(self, run_text2pcap_capinfos_tshark):
+ '''Test UDP over IPv4'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 60, 'expert': ''},
+ run_text2pcap_capinfos_tshark(
+ "0000: ff ff ff ff\n", ("-u", "1234,1234")))
+
+ def test_text2pcap_sctp(self, run_text2pcap_capinfos_tshark):
+ '''Test SCTP over IPv4'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 70, 'expert': ''},
+ run_text2pcap_capinfos_tshark(
+ "0000 00 03 00 18 00 00 00 00 00 00 00 00 00 00 00 03\n" +
+ "0010 01 00 03 03 00 00 00 08\n",
+ ("-s", "2905,2905,3")))
+
+ def test_text2pcap_sctp_data(self, run_text2pcap_capinfos_tshark):
+ '''Test SCTP DATA over IPv4'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 70, 'expert': ''},
+ run_text2pcap_capinfos_tshark(
+ "0000: 01 00 03 03 00 00 00 08\n",
+ ("-S", "2905,2905,3")))
+
+
+@fixtures.fixture
+def run_text2pcap_ipv4(run_text2pcap_capinfos_tshark):
+ def run_text2pcap_ipv4_real(content, args):
+ return run_text2pcap_capinfos_tshark(content,
+ ("-4", "127.0.0.1,127.0.0.1") + args)
+ return run_text2pcap_ipv4_real
+
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
+class case_text2pcap_ipv4(subprocesstest.SubprocessTestCase):
+ '''Test TCP, UDP or SCTP header with -4 option'''
+ maxDiff = None
+
+ def test_text2pcap_ipv4_tcp(self, run_text2pcap_ipv4):
+ '''Test TCP over IPv4'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 60, 'expert': ''},
+ run_text2pcap_ipv4("0000: ff ff ff ff\n", ("-T", "1234,1234")))
+
+ def test_text2pcap_ipv4_udp(self, run_text2pcap_ipv4):
+ '''Test UDP over IPv4'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 60, 'expert': ''},
+ run_text2pcap_ipv4("0000: ff ff ff ff\n", ("-u", "1234,1234")))
+
+ def test_text2pcap_ipv4_sctp(self, run_text2pcap_ipv4):
+ '''Test SCTP over IPv4'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 70, 'expert': ''},
+ run_text2pcap_ipv4(
+ "0000 00 03 00 18 00 00 00 00 00 00 00 00 00 00 00 03\n" +
+ "0010 01 00 03 03 00 00 00 08\n",
+ ("-s", "2905,2905,3")))
+
+ def test_text2pcap_ipv4_sctp_data(self, run_text2pcap_ipv4):
+ '''Test SCTP DATA over IPv4'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 70, 'expert': ''},
+ run_text2pcap_ipv4("0000: 01 00 03 03 00 00 00 08\n",
+ ("-S", "2905,2905,3")))
+
+
+@fixtures.fixture
+def run_text2pcap_ipv6(cmd_tshark, run_text2pcap_capinfos_tshark, request):
+ self = request.instance
+ def run_text2pcap_ipv6_real(content, text2pcap_args, tshark_args = ()):
+ #Run the common text2pcap tests
+ result = run_text2pcap_capinfos_tshark(content,
+ ("-6", "::1,::1") + text2pcap_args)
+
+ #Decode the output pcap in JSON format
+ self.assertRun((cmd_tshark, '-T', 'json',
+ '-r', self.filename_from_id(testout_pcap)) + tshark_args)
+ data = json.loads(self.processes[-1].stdout_str)
+
+ #Add IPv6 payload length and payload length tree to the result dict
+ ipv6 = data[0]['_source']['layers']['ipv6']
+ result['ipv6'] = {
+ 'plen': ipv6.get('ipv6.plen', None),
+ 'plen_tree': ipv6.get('ipv6.plen_tree', None)}
+ return result
+ return run_text2pcap_ipv6_real
+
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
+class case_text2pcap_ipv6(subprocesstest.SubprocessTestCase):
+ '''Test TCP, UDP or SCTP header with -6 option'''
+ maxDiff = None
+
+ def test_text2pcap_ipv6_tcp(self, run_text2pcap_ipv6):
+ '''Test TCP over IPv6'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 78, 'expert': '',
+ 'ipv6': {'plen': '24', 'plen_tree': None}},
+ run_text2pcap_ipv6("0000: ff ff ff ff\n", ("-T", "1234,1234")))
+
+ def test_text2pcap_ipv6_udp(self, run_text2pcap_ipv6):
+ '''Test UDP over IPv6'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 66, 'expert': '',
+ 'ipv6': {'plen': '12', 'plen_tree': None}},
+ run_text2pcap_ipv6("0000: ff ff ff ff\n", ("-u", "1234,1234")))
+
+ def test_text2pcap_ipv6_sctp(self, run_text2pcap_ipv6):
+ '''Test SCTP over IPv6'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 90, 'expert': '',
+ 'ipv6': {'plen': '36', 'plen_tree': None}},
+ run_text2pcap_ipv6(
+ "0000 00 03 00 18 00 00 00 00 00 00 00 00 00 00 00 03\n" +
+ "0010 01 00 03 03 00 00 00 08\n",
+ ("-s", "2905,2905,3")))
+
+ def test_text2pcap_ipv6_sctp_data(self, run_text2pcap_ipv6):
+ '''Test SCTP DATA over IPv6'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 90, 'expert': '',
+ 'ipv6': {'plen': '36', 'plen_tree': None}},
+ run_text2pcap_ipv6("0000: 01 00 03 03 00 00 00 08\n",
+ ("-S", "2905,2905,3")))
+
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
+class case_text2pcap_i_proto(subprocesstest.SubprocessTestCase):
+ '''Test -i <proto> for IPv4 and IPv6'''
+ maxDiff = None
+
+ def test_text2pcap_i_icmp(self, run_text2pcap_capinfos_tshark):
+ '''Test -i <proto> without -4 or -6'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 98, 'expert': ''},
+ run_text2pcap_capinfos_tshark(
+ "0000 08 00 bb b3 d7 3b 00 00 51 a7 d6 7d 00 04 51 e4\n" +
+ "0010 08 09 0a 0b 0c 0d 0e 0f 10 11 12 13 14 15 16 17\n" +
+ "0020 18 19 1a 1b 1c 1d 1e 1f 20 21 22 23 24 25 26 27\n" +
+ "0030 28 29 2a 2b 2c 2d 2e 2f 30 31 32 33 34 35 36 37\n",
+ ("-i", "1")))
+
+ def test_text2pcap_i_icmp_ipv4(self, run_text2pcap_capinfos_tshark):
+ '''Test -i <proto> with IPv4 (-4) header'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 98, 'expert': ''},
+ run_text2pcap_capinfos_tshark(
+ "0000 08 00 bb b3 d7 3b 00 00 51 a7 d6 7d 00 04 51 e4\n" +
+ "0010 08 09 0a 0b 0c 0d 0e 0f 10 11 12 13 14 15 16 17\n" +
+ "0020 18 19 1a 1b 1c 1d 1e 1f 20 21 22 23 24 25 26 27\n" +
+ "0030 28 29 2a 2b 2c 2d 2e 2f 30 31 32 33 34 35 36 37\n",
+ ("-i", "1", "-4", "127.0.0.1,127.0.0.1")))
+
+ def test_text2pcap_i_icmpv6_ipv6(self, run_text2pcap_capinfos_tshark):
+ '''Test -i <proto> with IPv6 (-6) header'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 86, 'expert': ''},
+ run_text2pcap_capinfos_tshark(
+ "0000 87 00 f2 62 00 00 00 00 fe 80 00 00 00 00 00 00\n" +
+ "0010 00 00 00 00 00 00 00 02 01 01 52 54 00 12 34 56\n",
+ ("-i", "58", "-6", "::1,::1")))
+
+ def test_text2pcap_i_sctp_ipv6(self, run_text2pcap_capinfos_tshark):
+ '''Test -i <proto> with IPv6 (-6) header'''
+ self.assertEqual({'encapsulation': 'Ethernet', 'packets': 1,
+ 'datasize': 90, 'expert': ''},
+ run_text2pcap_capinfos_tshark(
+ "0000 0b 59 0b 59 00 00 00 00 26 98 58 51 00 03 00 18\n" +
+ "0010 00 00 00 00 00 00 00 00 00 00 00 03 01 00 03 03\n" +
+ "0020 00 00 00 08\n",
+ ("-i", "132", "-6", "::1,::1")))
+
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
+class case_text2pcap_other_options(subprocesstest.SubprocessTestCase):
+ '''Test other command line options'''
+ def test_text2pcap_option_N(self, cmd_text2pcap, cmd_tshark, capture_file):
+ '''Test -N <intf-name> option'''
+ testin_file = self.filename_from_id(testin_txt)
+ testout_file = self.filename_from_id(testout_pcapng)
+
+ with open(testin_file, 'w') as f:
+ f.write("0000 00\n")
+ f.close()
+ self.assertRun((cmd_text2pcap, "-n", "-N", "your-interface-name", testin_file, testout_file))
+ proc = self.assertRun((cmd_tshark, "-r", testout_file, "-Tfields", "-eframe.interface_name", "-c1"))
+ self.assertEqual(proc.stdout_str.rstrip(), "your-interface-name")